Advanced Functional Concepts
Introduction
Advanced functional programming introduces powerful abstractions for handling effects, errors, and composition. This reading covers functors, applicatives, monads, and algebraic data types - concepts that enable writing reliable, composable code.
Learning Objectives
By the end of this reading, you will be able to:
- Understand and implement functors
- Apply the monad pattern for sequencing computations
- Use Option/Maybe for null safety
- Handle errors functionally with Either/Result
- Design algebraic data types
1. Functors
Concept
A functor is a container that can be mapped over. It implements map (or fmap) to apply a function to its contents while preserving the container structure.
from typing import TypeVar, Generic, Callable, List
from abc import ABC, abstractmethod
A = TypeVar('A')
B = TypeVar('B')
class Functor(Generic[A], ABC):
"""Base functor interface"""
@abstractmethod
def map(self, func: Callable[[A], B]) -> 'Functor[B]':
"""Apply function to contents, return new functor"""
pass
# List is a functor
class FunctorList(Functor[A]):
def __init__(self, items: List[A]):
self._items = items
def map(self, func: Callable[[A], B]) -> 'FunctorList[B]':
return FunctorList([func(x) for x in self._items])
def __repr__(self):
return f"FunctorList({self._items})"
# Usage
fl = FunctorList([1, 2, 3])
fl2 = fl.map(lambda x: x * 2) # FunctorList([2, 4, 6])
fl3 = fl2.map(str) # FunctorList(['2', '4', '6'])
Functor Laws
A proper functor must satisfy two laws:
# Law 1: Identity
# map(id) == id
# Mapping identity function changes nothing
def identity(x):
return x
fl = FunctorList([1, 2, 3])
assert fl.map(identity)._items == fl._items # Law 1
# Law 2: Composition
# map(f . g) == map(f) . map(g)
# Mapping composed functions equals composing mapped functions
def double(x): return x * 2
def add_one(x): return x + 1
fl = FunctorList([1, 2, 3])
# Composed then mapped
composed = fl.map(lambda x: double(add_one(x)))
# Mapped then mapped
chained = fl.map(add_one).map(double)
assert composed._items == chained._items # Law 2
Optional as Functor
from typing import Union, Optional as Opt
class Maybe(Generic[A]):
"""Maybe/Optional functor - represents possible absence of value"""
def __init__(self, value: Opt[A], is_some: bool):
self._value = value
self._is_some = is_some
@staticmethod
def some(value: A) -> 'Maybe[A]':
"""Create a Maybe with a value"""
return Maybe(value, True)
@staticmethod
def none() -> 'Maybe[A]':
"""Create an empty Maybe"""
return Maybe(None, False)
def map(self, func: Callable[[A], B]) -> 'Maybe[B]':
"""Apply function if value exists"""
if self._is_some:
return Maybe.some(func(self._value))
return Maybe.none()
def get_or_else(self, default: A) -> A:
"""Get value or return default"""
return self._value if self._is_some else default
def is_some(self) -> bool:
return self._is_some
def __repr__(self):
if self._is_some:
return f"Some({self._value})"
return "None"
# Usage - safe chaining
def safe_divide(a: float, b: float) -> Maybe[float]:
if b == 0:
return Maybe.none()
return Maybe.some(a / b)
result = (
safe_divide(10, 2)
.map(lambda x: x + 1)
.map(lambda x: x * 2)
)
print(result) # Some(12.0)
result2 = (
safe_divide(10, 0)
.map(lambda x: x + 1)
.map(lambda x: x * 2)
)
print(result2) # None
2. Monads
Concept
A monad is a functor that also implements flat_map (or bind/>>=). This allows chaining operations that themselves return containers.
from typing import TypeVar, Generic, Callable
A = TypeVar('A')
B = TypeVar('B')
class Monad(Generic[A]):
"""Base monad interface"""
@staticmethod
def pure(value: A) -> 'Monad[A]':
"""Wrap value in monad (also called 'return' or 'unit')"""
raise NotImplementedError
def flat_map(self, func: Callable[[A], 'Monad[B]']) -> 'Monad[B]':
"""Chain operations that return monads (also called 'bind' or '>>=')"""
raise NotImplementedError
def map(self, func: Callable[[A], B]) -> 'Monad[B]':
"""Derived from flat_map and pure"""
return self.flat_map(lambda x: self.__class__.pure(func(x)))
Maybe Monad
class Maybe(Generic[A]):
"""Maybe monad for optional values"""
def __init__(self, value: Opt[A], is_some: bool):
self._value = value
self._is_some = is_some
@staticmethod
def pure(value: A) -> 'Maybe[A]':
return Maybe.some(value)
@staticmethod
def some(value: A) -> 'Maybe[A]':
return Maybe(value, True)
@staticmethod
def none() -> 'Maybe[A]':
return Maybe(None, False)
def flat_map(self, func: Callable[[A], 'Maybe[B]']) -> 'Maybe[B]':
"""Chain operations that might fail"""
if self._is_some:
return func(self._value)
return Maybe.none()
def map(self, func: Callable[[A], B]) -> 'Maybe[B]':
if self._is_some:
return Maybe.some(func(self._value))
return Maybe.none()
def get_or_else(self, default: A) -> A:
return self._value if self._is_some else default
def __repr__(self):
return f"Some({self._value})" if self._is_some else "None"
# Example: Safe dictionary lookup
def lookup(d: dict, key: str) -> Maybe[any]:
if key in d:
return Maybe.some(d[key])
return Maybe.none()
data = {
'user': {
'address': {
'city': 'New York'
}
}
}
# Safe nested access with flat_map
result = (
lookup(data, 'user')
.flat_map(lambda user: lookup(user, 'address'))
.flat_map(lambda addr: lookup(addr, 'city'))
)
print(result) # Some(New York)
# Missing key
result2 = (
lookup(data, 'user')
.flat_map(lambda user: lookup(user, 'phone')) # Missing!
.flat_map(lambda phone: lookup(phone, 'number'))
)
print(result2) # None
Either/Result Monad
from typing import Union
class Either(Generic[A, B]):
"""Either monad - represents success (Right) or failure (Left)"""
def __init__(self, value: Union[A, B], is_right: bool):
self._value = value
self._is_right = is_right
@staticmethod
def right(value: B) -> 'Either[A, B]':
"""Create success value"""
return Either(value, True)
@staticmethod
def left(value: A) -> 'Either[A, B]':
"""Create failure value"""
return Either(value, False)
def map(self, func: Callable[[B], 'C']) -> 'Either[A, C]':
"""Apply function to right value"""
if self._is_right:
return Either.right(func(self._value))
return Either.left(self._value)
def flat_map(self, func: Callable[[B], 'Either[A, C]']) -> 'Either[A, C]':
"""Chain operations that might fail"""
if self._is_right:
return func(self._value)
return Either.left(self._value)
def map_left(self, func: Callable[[A], 'C']) -> 'Either[C, B]':
"""Transform the error"""
if not self._is_right:
return Either.left(func(self._value))
return Either.right(self._value)
def get_or_else(self, default: B) -> B:
return self._value if self._is_right else default
def fold(self, on_left: Callable[[A], 'C'], on_right: Callable[[B], 'C']) -> 'C':
"""Handle both cases"""
if self._is_right:
return on_right(self._value)
return on_left(self._value)
def __repr__(self):
if self._is_right:
return f"Right({self._value})"
return f"Left({self._value})"
# Practical example: Validation
class ValidationError:
def __init__(self, message: str):
self.message = message
def __repr__(self):
return f"ValidationError({self.message})"
def parse_int(s: str) -> Either[ValidationError, int]:
try:
return Either.right(int(s))
except ValueError:
return Either.left(ValidationError(f"Cannot parse '{s}' as int"))
def validate_positive(n: int) -> Either[ValidationError, int]:
if n > 0:
return Either.right(n)
return Either.left(ValidationError(f"{n} is not positive"))
def validate_less_than_100(n: int) -> Either[ValidationError, int]:
if n < 100:
return Either.right(n)
return Either.left(ValidationError(f"{n} is not less than 100"))
# Chain validations
def validate_number(s: str) -> Either[ValidationError, int]:
return (
parse_int(s)
.flat_map(validate_positive)
.flat_map(validate_less_than_100)
)
print(validate_number("42")) # Right(42)
print(validate_number("-5")) # Left(ValidationError(-5 is not positive))
print(validate_number("abc")) # Left(ValidationError(Cannot parse 'abc' as int))
print(validate_number("150")) # Left(ValidationError(150 is not less than 100))
Monad Laws
# A proper monad satisfies three laws:
# 1. Left Identity: pure(a).flat_map(f) == f(a)
a = 5
f = lambda x: Maybe.some(x * 2)
assert Maybe.pure(a).flat_map(f)._value == f(a)._value
# 2. Right Identity: m.flat_map(pure) == m
m = Maybe.some(5)
assert m.flat_map(Maybe.pure)._value == m._value
# 3. Associativity: m.flat_map(f).flat_map(g) == m.flat_map(lambda x: f(x).flat_map(g))
m = Maybe.some(5)
f = lambda x: Maybe.some(x * 2)
g = lambda x: Maybe.some(x + 1)
left_side = m.flat_map(f).flat_map(g)
right_side = m.flat_map(lambda x: f(x).flat_map(g))
assert left_side._value == right_side._value
3. List Monad
class ListM(Generic[A]):
"""List monad - represents multiple possible values"""
def __init__(self, items: List[A]):
self._items = items
@staticmethod
def pure(value: A) -> 'ListM[A]':
return ListM([value])
def flat_map(self, func: Callable[[A], 'ListM[B]']) -> 'ListM[B]':
"""Apply function to each element and flatten results"""
results = []
for item in self._items:
results.extend(func(item)._items)
return ListM(results)
def map(self, func: Callable[[A], B]) -> 'ListM[B]':
return ListM([func(x) for x in self._items])
def filter(self, pred: Callable[[A], bool]) -> 'ListM[A]':
return ListM([x for x in self._items if pred(x)])
def __repr__(self):
return f"ListM({self._items})"
# Example: Cartesian product with flat_map
colors = ListM(['red', 'green', 'blue'])
sizes = ListM(['S', 'M', 'L'])
combinations = colors.flat_map(
lambda color: sizes.map(
lambda size: f"{color}-{size}"
)
)
print(combinations)
# ListM(['red-S', 'red-M', 'red-L', 'green-S', 'green-M', 'green-L', 'blue-S', 'blue-M', 'blue-L'])
# List comprehension for non-deterministic computation
def pythagorean_triples(limit: int) -> ListM[tuple]:
"""Find all Pythagorean triples up to limit"""
return (
ListM(range(1, limit + 1)).flat_map(lambda a:
ListM(range(a, limit + 1)).flat_map(lambda b:
ListM(range(b, limit + 1)).flat_map(lambda c:
ListM([(a, b, c)]) if a*a + b*b == c*c else ListM([])
)
)
)
)
print(pythagorean_triples(15))
# ListM([(3, 4, 5), (5, 12, 13), (6, 8, 10), (9, 12, 15)])
4. IO Monad (Conceptual)
from typing import Callable, TypeVar
A = TypeVar('A')
B = TypeVar('B')
class IO(Generic[A]):
"""
IO monad - represents a computation with side effects.
The computation is not executed until explicitly run.
This separates description from execution.
"""
def __init__(self, effect: Callable[[], A]):
self._effect = effect
@staticmethod
def pure(value: A) -> 'IO[A]':
"""Wrap pure value in IO"""
return IO(lambda: value)
def map(self, func: Callable[[A], B]) -> 'IO[B]':
"""Transform the result"""
return IO(lambda: func(self._effect()))
def flat_map(self, func: Callable[[A], 'IO[B]']) -> 'IO[B]':
"""Chain IO operations"""
return IO(lambda: func(self._effect())._effect())
def run(self) -> A:
"""Execute the IO operation"""
return self._effect()
# IO operations
def read_line() -> IO[str]:
"""Read a line from stdin"""
return IO(lambda: input())
def print_line(message: str) -> IO[None]:
"""Print a line to stdout"""
return IO(lambda: print(message))
def get_time() -> IO[float]:
"""Get current time"""
import time
return IO(lambda: time.time())
# Compose IO operations
def greet_user() -> IO[None]:
"""Program that greets user - pure description of side effects"""
return (
print_line("What is your name?")
.flat_map(lambda _: read_line())
.flat_map(lambda name: print_line(f"Hello, {name}!"))
)
# Nothing happens until we run it
# greet_user().run()
# Another example: logging with timestamps
def log_with_time(message: str) -> IO[None]:
return (
get_time()
.flat_map(lambda t: print_line(f"[{t}] {message}"))
)
5. Algebraic Data Types
Sum Types (Tagged Unions)
from dataclasses import dataclass
from typing import Union, Generic, TypeVar, Callable
from enum import Enum, auto
# Simple enum-based sum type
class Color(Enum):
RED = auto()
GREEN = auto()
BLUE = auto()
# More complex sum type using classes
@dataclass
class Circle:
radius: float
@dataclass
class Rectangle:
width: float
height: float
@dataclass
class Triangle:
base: float
height: float
# Union type
Shape = Union[Circle, Rectangle, Triangle]
def area(shape: Shape) -> float:
"""Pattern match on shape type"""
if isinstance(shape, Circle):
return 3.14159 * shape.radius ** 2
elif isinstance(shape, Rectangle):
return shape.width * shape.height
elif isinstance(shape, Triangle):
return 0.5 * shape.base * shape.height
raise TypeError(f"Unknown shape: {shape}")
# Using match (Python 3.10+)
def area_match(shape: Shape) -> float:
match shape:
case Circle(radius=r):
return 3.14159 * r ** 2
case Rectangle(width=w, height=h):
return w * h
case Triangle(base=b, height=h):
return 0.5 * b * h
Product Types
from dataclasses import dataclass
from typing import NamedTuple
# Product types combine multiple values
# Using dataclass
@dataclass(frozen=True)
class Person:
name: str
age: int
email: str
# Using NamedTuple
class Point(NamedTuple):
x: float
y: float
# Both are product types: Person = String × Int × String
# They represent the Cartesian product of their component types
person = Person("Alice", 30, "alice@example.com")
point = Point(3.0, 4.0)
Recursive Data Types
from typing import Union, Optional
from dataclasses import dataclass
# Linked list as algebraic data type
@dataclass
class Nil:
"""Empty list"""
pass
@dataclass
class Cons(Generic[A]):
"""Non-empty list: head and tail"""
head: A
tail: 'LinkedList[A]'
LinkedList = Union[Nil, Cons[A]]
def list_sum(lst: LinkedList[int]) -> int:
"""Sum elements of linked list"""
if isinstance(lst, Nil):
return 0
return lst.head + list_sum(lst.tail)
def list_map(func: Callable[[A], B], lst: LinkedList[A]) -> LinkedList[B]:
"""Map over linked list"""
if isinstance(lst, Nil):
return Nil()
return Cons(func(lst.head), list_map(func, lst.tail))
# Example
my_list = Cons(1, Cons(2, Cons(3, Nil())))
print(list_sum(my_list)) # 6
doubled = list_map(lambda x: x * 2, my_list)
print(list_sum(doubled)) # 12
# Binary tree as algebraic data type
@dataclass
class Empty:
pass
@dataclass
class Node(Generic[A]):
value: A
left: 'Tree[A]'
right: 'Tree[A]'
Tree = Union[Empty, Node[A]]
def tree_sum(tree: Tree[int]) -> int:
if isinstance(tree, Empty):
return 0
return tree.value + tree_sum(tree.left) + tree_sum(tree.right)
def tree_map(func: Callable[[A], B], tree: Tree[A]) -> Tree[B]:
if isinstance(tree, Empty):
return Empty()
return Node(
func(tree.value),
tree_map(func, tree.left),
tree_map(func, tree.right)
)
6. Applicative Functors
from typing import TypeVar, Generic, Callable, List
A = TypeVar('A')
B = TypeVar('B')
class Applicative(Generic[A]):
"""
Applicative functor - functor with ability to apply wrapped functions.
Key operation: apply (or <*> in Haskell)
Apply a wrapped function to a wrapped value.
"""
@staticmethod
def pure(value: A) -> 'Applicative[A]':
"""Wrap a value"""
raise NotImplementedError
def apply(self, func_wrapped: 'Applicative[Callable[[A], B]]') -> 'Applicative[B]':
"""Apply wrapped function to self"""
raise NotImplementedError
class MaybeAp(Generic[A]):
"""Maybe as an applicative functor"""
def __init__(self, value: Optional[A], is_some: bool):
self._value = value
self._is_some = is_some
@staticmethod
def pure(value: A) -> 'MaybeAp[A]':
return MaybeAp(value, True)
@staticmethod
def some(value: A) -> 'MaybeAp[A]':
return MaybeAp(value, True)
@staticmethod
def none() -> 'MaybeAp[A]':
return MaybeAp(None, False)
def map(self, func: Callable[[A], B]) -> 'MaybeAp[B]':
if self._is_some:
return MaybeAp.some(func(self._value))
return MaybeAp.none()
def apply(self, func_maybe: 'MaybeAp[Callable[[A], B]]') -> 'MaybeAp[B]':
"""Apply function in Maybe to value in Maybe"""
if func_maybe._is_some and self._is_some:
return MaybeAp.some(func_maybe._value(self._value))
return MaybeAp.none()
def __repr__(self):
return f"Some({self._value})" if self._is_some else "None"
# Usage: Apply function to multiple Maybe values
def add(a: int) -> Callable[[int], int]:
"""Curried add"""
return lambda b: a + b
# Lift add into Maybe context
maybe_a = MaybeAp.some(5)
maybe_b = MaybeAp.some(3)
# Apply curried function
result = maybe_b.apply(maybe_a.map(add))
print(result) # Some(8)
# If either is None, result is None
maybe_c = MaybeAp.none()
result2 = maybe_c.apply(maybe_a.map(add))
print(result2) # None
# Validation example: combine multiple validations
def lift_a2(
func: Callable[[A, B], 'C'],
fa: MaybeAp[A],
fb: MaybeAp[B]
) -> MaybeAp['C']:
"""Lift a binary function into applicative context"""
curried = lambda a: lambda b: func(a, b)
return fb.apply(fa.map(curried))
def create_person(name: str, age: int) -> dict:
return {'name': name, 'age': age}
valid_name = MaybeAp.some("Alice")
valid_age = MaybeAp.some(30)
invalid_age = MaybeAp.none()
person = lift_a2(create_person, valid_name, valid_age)
print(person) # Some({'name': 'Alice', 'age': 30})
person2 = lift_a2(create_person, valid_name, invalid_age)
print(person2) # None
7. Practical Patterns
Railway-Oriented Programming
from typing import Callable, TypeVar, List
from dataclasses import dataclass
A = TypeVar('A')
B = TypeVar('B')
@dataclass
class Success(Generic[A]):
value: A
@dataclass
class Failure:
errors: List[str]
Result = Union[Success[A], Failure]
def bind(result: Result[A], func: Callable[[A], Result[B]]) -> Result[B]:
"""Continue on success, short-circuit on failure"""
if isinstance(result, Success):
return func(result.value)
return result
def map_result(result: Result[A], func: Callable[[A], B]) -> Result[B]:
"""Transform success value"""
if isinstance(result, Success):
return Success(func(result.value))
return result
# Validation functions that return Result
def validate_email(email: str) -> Result[str]:
if '@' in email:
return Success(email)
return Failure(["Invalid email format"])
def validate_age(age: int) -> Result[int]:
if 0 <= age <= 150:
return Success(age)
return Failure(["Age must be between 0 and 150"])
def validate_name(name: str) -> Result[str]:
if len(name) >= 2:
return Success(name)
return Failure(["Name must be at least 2 characters"])
# Railway: chain validations
def validate_user(name: str, age: int, email: str) -> Result[dict]:
return bind(
validate_name(name),
lambda valid_name: bind(
validate_age(age),
lambda valid_age: bind(
validate_email(email),
lambda valid_email: Success({
'name': valid_name,
'age': valid_age,
'email': valid_email
})
)
)
)
print(validate_user("Alice", 30, "alice@example.com"))
# Success(value={'name': 'Alice', 'age': 30, 'email': 'alice@example.com'})
print(validate_user("A", 30, "invalid"))
# Failure(errors=['Name must be at least 2 characters'])
Free Monad Pattern (Simplified)
from typing import Generic, TypeVar, Callable, Union
from dataclasses import dataclass
from abc import ABC, abstractmethod
A = TypeVar('A')
# Define operations as data
@dataclass
class ReadFile:
path: str
next: Callable[[str], 'Free']
@dataclass
class WriteFile:
path: str
content: str
next: Callable[[], 'Free']
@dataclass
class Pure(Generic[A]):
value: A
Free = Union[ReadFile, WriteFile, Pure[A]]
# Build program as data structure
def read_file(path: str) -> Free:
return ReadFile(path, lambda content: Pure(content))
def write_file(path: str, content: str) -> Free:
return WriteFile(path, content, lambda: Pure(None))
# Interpreter that actually executes
def run_io(program: Free):
"""Execute the program"""
if isinstance(program, Pure):
return program.value
elif isinstance(program, ReadFile):
content = open(program.path).read() # Actual IO
return run_io(program.next(content))
elif isinstance(program, WriteFile):
open(program.path, 'w').write(program.content) # Actual IO
return run_io(program.next())
# Test interpreter (no actual IO)
def run_test(program: Free, mock_files: dict):
"""Execute program with mock files"""
if isinstance(program, Pure):
return program.value
elif isinstance(program, ReadFile):
content = mock_files.get(program.path, "")
return run_test(program.next(content), mock_files)
elif isinstance(program, WriteFile):
mock_files[program.path] = program.content
return run_test(program.next(), mock_files)
Exercises
Basic
Implement a
mapfunction for the Either monad that transforms the Right value.Create a
safe_headfunction that returnsMaybe[A]instead of raising an exception.Define a binary tree ADT and implement
tree_fold(catamorphism).
Intermediate
Implement
traversefor lists: turnList[Maybe[A]]intoMaybe[List[A]].Create a
Validatedtype that accumulates all errors instead of short-circuiting.Implement a state monad for threading state through computations.
Advanced
Implement a parser combinator library using monads.
Design a free monad for database operations with multiple interpreters.
Create a monad transformer stack combining Reader, Either, and IO effects.
Summary
- Functors allow mapping over containers (
map) - Monads enable sequencing effectful computations (
flat_map) - Maybe/Option handles absence, Either/Result handles errors
- Algebraic data types model data with sum and product types
- Applicatives allow applying wrapped functions to wrapped values
- These patterns lead to composable, testable, type-safe code
Module Complete
This completes the Functional Programming module! You've learned core concepts for writing declarative, composable programs.