Advanced Functional Concepts

Introduction

Advanced functional programming introduces powerful abstractions for handling effects, errors, and composition. This reading covers functors, applicatives, monads, and algebraic data types - concepts that enable writing reliable, composable code.

Learning Objectives

By the end of this reading, you will be able to:

  • Understand and implement functors
  • Apply the monad pattern for sequencing computations
  • Use Option/Maybe for null safety
  • Handle errors functionally with Either/Result
  • Design algebraic data types

1. Functors

Concept

A functor is a container that can be mapped over. It implements map (or fmap) to apply a function to its contents while preserving the container structure.

from typing import TypeVar, Generic, Callable, List
from abc import ABC, abstractmethod

A = TypeVar('A')
B = TypeVar('B')

class Functor(Generic[A], ABC):
    """Base functor interface"""

    @abstractmethod
    def map(self, func: Callable[[A], B]) -> 'Functor[B]':
        """Apply function to contents, return new functor"""
        pass

# List is a functor
class FunctorList(Functor[A]):
    def __init__(self, items: List[A]):
        self._items = items

    def map(self, func: Callable[[A], B]) -> 'FunctorList[B]':
        return FunctorList([func(x) for x in self._items])

    def __repr__(self):
        return f"FunctorList({self._items})"

# Usage
fl = FunctorList([1, 2, 3])
fl2 = fl.map(lambda x: x * 2)  # FunctorList([2, 4, 6])
fl3 = fl2.map(str)  # FunctorList(['2', '4', '6'])

Functor Laws

A proper functor must satisfy two laws:

# Law 1: Identity
# map(id) == id
# Mapping identity function changes nothing

def identity(x):
    return x

fl = FunctorList([1, 2, 3])
assert fl.map(identity)._items == fl._items  # Law 1

# Law 2: Composition
# map(f . g) == map(f) . map(g)
# Mapping composed functions equals composing mapped functions

def double(x): return x * 2
def add_one(x): return x + 1

fl = FunctorList([1, 2, 3])

# Composed then mapped
composed = fl.map(lambda x: double(add_one(x)))

# Mapped then mapped
chained = fl.map(add_one).map(double)

assert composed._items == chained._items  # Law 2

Optional as Functor

from typing import Union, Optional as Opt

class Maybe(Generic[A]):
    """Maybe/Optional functor - represents possible absence of value"""

    def __init__(self, value: Opt[A], is_some: bool):
        self._value = value
        self._is_some = is_some

    @staticmethod
    def some(value: A) -> 'Maybe[A]':
        """Create a Maybe with a value"""
        return Maybe(value, True)

    @staticmethod
    def none() -> 'Maybe[A]':
        """Create an empty Maybe"""
        return Maybe(None, False)

    def map(self, func: Callable[[A], B]) -> 'Maybe[B]':
        """Apply function if value exists"""
        if self._is_some:
            return Maybe.some(func(self._value))
        return Maybe.none()

    def get_or_else(self, default: A) -> A:
        """Get value or return default"""
        return self._value if self._is_some else default

    def is_some(self) -> bool:
        return self._is_some

    def __repr__(self):
        if self._is_some:
            return f"Some({self._value})"
        return "None"

# Usage - safe chaining
def safe_divide(a: float, b: float) -> Maybe[float]:
    if b == 0:
        return Maybe.none()
    return Maybe.some(a / b)

result = (
    safe_divide(10, 2)
    .map(lambda x: x + 1)
    .map(lambda x: x * 2)
)
print(result)  # Some(12.0)

result2 = (
    safe_divide(10, 0)
    .map(lambda x: x + 1)
    .map(lambda x: x * 2)
)
print(result2)  # None

2. Monads

Concept

A monad is a functor that also implements flat_map (or bind/>>=). This allows chaining operations that themselves return containers.

from typing import TypeVar, Generic, Callable

A = TypeVar('A')
B = TypeVar('B')

class Monad(Generic[A]):
    """Base monad interface"""

    @staticmethod
    def pure(value: A) -> 'Monad[A]':
        """Wrap value in monad (also called 'return' or 'unit')"""
        raise NotImplementedError

    def flat_map(self, func: Callable[[A], 'Monad[B]']) -> 'Monad[B]':
        """Chain operations that return monads (also called 'bind' or '>>=')"""
        raise NotImplementedError

    def map(self, func: Callable[[A], B]) -> 'Monad[B]':
        """Derived from flat_map and pure"""
        return self.flat_map(lambda x: self.__class__.pure(func(x)))

Maybe Monad

class Maybe(Generic[A]):
    """Maybe monad for optional values"""

    def __init__(self, value: Opt[A], is_some: bool):
        self._value = value
        self._is_some = is_some

    @staticmethod
    def pure(value: A) -> 'Maybe[A]':
        return Maybe.some(value)

    @staticmethod
    def some(value: A) -> 'Maybe[A]':
        return Maybe(value, True)

    @staticmethod
    def none() -> 'Maybe[A]':
        return Maybe(None, False)

    def flat_map(self, func: Callable[[A], 'Maybe[B]']) -> 'Maybe[B]':
        """Chain operations that might fail"""
        if self._is_some:
            return func(self._value)
        return Maybe.none()

    def map(self, func: Callable[[A], B]) -> 'Maybe[B]':
        if self._is_some:
            return Maybe.some(func(self._value))
        return Maybe.none()

    def get_or_else(self, default: A) -> A:
        return self._value if self._is_some else default

    def __repr__(self):
        return f"Some({self._value})" if self._is_some else "None"

# Example: Safe dictionary lookup
def lookup(d: dict, key: str) -> Maybe[any]:
    if key in d:
        return Maybe.some(d[key])
    return Maybe.none()

data = {
    'user': {
        'address': {
            'city': 'New York'
        }
    }
}

# Safe nested access with flat_map
result = (
    lookup(data, 'user')
    .flat_map(lambda user: lookup(user, 'address'))
    .flat_map(lambda addr: lookup(addr, 'city'))
)
print(result)  # Some(New York)

# Missing key
result2 = (
    lookup(data, 'user')
    .flat_map(lambda user: lookup(user, 'phone'))  # Missing!
    .flat_map(lambda phone: lookup(phone, 'number'))
)
print(result2)  # None

Either/Result Monad

from typing import Union

class Either(Generic[A, B]):
    """Either monad - represents success (Right) or failure (Left)"""

    def __init__(self, value: Union[A, B], is_right: bool):
        self._value = value
        self._is_right = is_right

    @staticmethod
    def right(value: B) -> 'Either[A, B]':
        """Create success value"""
        return Either(value, True)

    @staticmethod
    def left(value: A) -> 'Either[A, B]':
        """Create failure value"""
        return Either(value, False)

    def map(self, func: Callable[[B], 'C']) -> 'Either[A, C]':
        """Apply function to right value"""
        if self._is_right:
            return Either.right(func(self._value))
        return Either.left(self._value)

    def flat_map(self, func: Callable[[B], 'Either[A, C]']) -> 'Either[A, C]':
        """Chain operations that might fail"""
        if self._is_right:
            return func(self._value)
        return Either.left(self._value)

    def map_left(self, func: Callable[[A], 'C']) -> 'Either[C, B]':
        """Transform the error"""
        if not self._is_right:
            return Either.left(func(self._value))
        return Either.right(self._value)

    def get_or_else(self, default: B) -> B:
        return self._value if self._is_right else default

    def fold(self, on_left: Callable[[A], 'C'], on_right: Callable[[B], 'C']) -> 'C':
        """Handle both cases"""
        if self._is_right:
            return on_right(self._value)
        return on_left(self._value)

    def __repr__(self):
        if self._is_right:
            return f"Right({self._value})"
        return f"Left({self._value})"

# Practical example: Validation
class ValidationError:
    def __init__(self, message: str):
        self.message = message

    def __repr__(self):
        return f"ValidationError({self.message})"

def parse_int(s: str) -> Either[ValidationError, int]:
    try:
        return Either.right(int(s))
    except ValueError:
        return Either.left(ValidationError(f"Cannot parse '{s}' as int"))

def validate_positive(n: int) -> Either[ValidationError, int]:
    if n > 0:
        return Either.right(n)
    return Either.left(ValidationError(f"{n} is not positive"))

def validate_less_than_100(n: int) -> Either[ValidationError, int]:
    if n < 100:
        return Either.right(n)
    return Either.left(ValidationError(f"{n} is not less than 100"))

# Chain validations
def validate_number(s: str) -> Either[ValidationError, int]:
    return (
        parse_int(s)
        .flat_map(validate_positive)
        .flat_map(validate_less_than_100)
    )

print(validate_number("42"))    # Right(42)
print(validate_number("-5"))    # Left(ValidationError(-5 is not positive))
print(validate_number("abc"))   # Left(ValidationError(Cannot parse 'abc' as int))
print(validate_number("150"))   # Left(ValidationError(150 is not less than 100))

Monad Laws

# A proper monad satisfies three laws:

# 1. Left Identity: pure(a).flat_map(f) == f(a)
a = 5
f = lambda x: Maybe.some(x * 2)
assert Maybe.pure(a).flat_map(f)._value == f(a)._value

# 2. Right Identity: m.flat_map(pure) == m
m = Maybe.some(5)
assert m.flat_map(Maybe.pure)._value == m._value

# 3. Associativity: m.flat_map(f).flat_map(g) == m.flat_map(lambda x: f(x).flat_map(g))
m = Maybe.some(5)
f = lambda x: Maybe.some(x * 2)
g = lambda x: Maybe.some(x + 1)

left_side = m.flat_map(f).flat_map(g)
right_side = m.flat_map(lambda x: f(x).flat_map(g))
assert left_side._value == right_side._value

3. List Monad

class ListM(Generic[A]):
    """List monad - represents multiple possible values"""

    def __init__(self, items: List[A]):
        self._items = items

    @staticmethod
    def pure(value: A) -> 'ListM[A]':
        return ListM([value])

    def flat_map(self, func: Callable[[A], 'ListM[B]']) -> 'ListM[B]':
        """Apply function to each element and flatten results"""
        results = []
        for item in self._items:
            results.extend(func(item)._items)
        return ListM(results)

    def map(self, func: Callable[[A], B]) -> 'ListM[B]':
        return ListM([func(x) for x in self._items])

    def filter(self, pred: Callable[[A], bool]) -> 'ListM[A]':
        return ListM([x for x in self._items if pred(x)])

    def __repr__(self):
        return f"ListM({self._items})"

# Example: Cartesian product with flat_map
colors = ListM(['red', 'green', 'blue'])
sizes = ListM(['S', 'M', 'L'])

combinations = colors.flat_map(
    lambda color: sizes.map(
        lambda size: f"{color}-{size}"
    )
)
print(combinations)
# ListM(['red-S', 'red-M', 'red-L', 'green-S', 'green-M', 'green-L', 'blue-S', 'blue-M', 'blue-L'])

# List comprehension for non-deterministic computation
def pythagorean_triples(limit: int) -> ListM[tuple]:
    """Find all Pythagorean triples up to limit"""
    return (
        ListM(range(1, limit + 1)).flat_map(lambda a:
            ListM(range(a, limit + 1)).flat_map(lambda b:
                ListM(range(b, limit + 1)).flat_map(lambda c:
                    ListM([(a, b, c)]) if a*a + b*b == c*c else ListM([])
                )
            )
        )
    )

print(pythagorean_triples(15))
# ListM([(3, 4, 5), (5, 12, 13), (6, 8, 10), (9, 12, 15)])

4. IO Monad (Conceptual)

from typing import Callable, TypeVar

A = TypeVar('A')
B = TypeVar('B')

class IO(Generic[A]):
    """
    IO monad - represents a computation with side effects.

    The computation is not executed until explicitly run.
    This separates description from execution.
    """

    def __init__(self, effect: Callable[[], A]):
        self._effect = effect

    @staticmethod
    def pure(value: A) -> 'IO[A]':
        """Wrap pure value in IO"""
        return IO(lambda: value)

    def map(self, func: Callable[[A], B]) -> 'IO[B]':
        """Transform the result"""
        return IO(lambda: func(self._effect()))

    def flat_map(self, func: Callable[[A], 'IO[B]']) -> 'IO[B]':
        """Chain IO operations"""
        return IO(lambda: func(self._effect())._effect())

    def run(self) -> A:
        """Execute the IO operation"""
        return self._effect()

# IO operations
def read_line() -> IO[str]:
    """Read a line from stdin"""
    return IO(lambda: input())

def print_line(message: str) -> IO[None]:
    """Print a line to stdout"""
    return IO(lambda: print(message))

def get_time() -> IO[float]:
    """Get current time"""
    import time
    return IO(lambda: time.time())

# Compose IO operations
def greet_user() -> IO[None]:
    """Program that greets user - pure description of side effects"""
    return (
        print_line("What is your name?")
        .flat_map(lambda _: read_line())
        .flat_map(lambda name: print_line(f"Hello, {name}!"))
    )

# Nothing happens until we run it
# greet_user().run()

# Another example: logging with timestamps
def log_with_time(message: str) -> IO[None]:
    return (
        get_time()
        .flat_map(lambda t: print_line(f"[{t}] {message}"))
    )

5. Algebraic Data Types

Sum Types (Tagged Unions)

from dataclasses import dataclass
from typing import Union, Generic, TypeVar, Callable
from enum import Enum, auto

# Simple enum-based sum type
class Color(Enum):
    RED = auto()
    GREEN = auto()
    BLUE = auto()

# More complex sum type using classes
@dataclass
class Circle:
    radius: float

@dataclass
class Rectangle:
    width: float
    height: float

@dataclass
class Triangle:
    base: float
    height: float

# Union type
Shape = Union[Circle, Rectangle, Triangle]

def area(shape: Shape) -> float:
    """Pattern match on shape type"""
    if isinstance(shape, Circle):
        return 3.14159 * shape.radius ** 2
    elif isinstance(shape, Rectangle):
        return shape.width * shape.height
    elif isinstance(shape, Triangle):
        return 0.5 * shape.base * shape.height
    raise TypeError(f"Unknown shape: {shape}")

# Using match (Python 3.10+)
def area_match(shape: Shape) -> float:
    match shape:
        case Circle(radius=r):
            return 3.14159 * r ** 2
        case Rectangle(width=w, height=h):
            return w * h
        case Triangle(base=b, height=h):
            return 0.5 * b * h

Product Types

from dataclasses import dataclass
from typing import NamedTuple

# Product types combine multiple values

# Using dataclass
@dataclass(frozen=True)
class Person:
    name: str
    age: int
    email: str

# Using NamedTuple
class Point(NamedTuple):
    x: float
    y: float

# Both are product types: Person = String × Int × String
# They represent the Cartesian product of their component types

person = Person("Alice", 30, "alice@example.com")
point = Point(3.0, 4.0)

Recursive Data Types

from typing import Union, Optional
from dataclasses import dataclass

# Linked list as algebraic data type
@dataclass
class Nil:
    """Empty list"""
    pass

@dataclass
class Cons(Generic[A]):
    """Non-empty list: head and tail"""
    head: A
    tail: 'LinkedList[A]'

LinkedList = Union[Nil, Cons[A]]

def list_sum(lst: LinkedList[int]) -> int:
    """Sum elements of linked list"""
    if isinstance(lst, Nil):
        return 0
    return lst.head + list_sum(lst.tail)

def list_map(func: Callable[[A], B], lst: LinkedList[A]) -> LinkedList[B]:
    """Map over linked list"""
    if isinstance(lst, Nil):
        return Nil()
    return Cons(func(lst.head), list_map(func, lst.tail))

# Example
my_list = Cons(1, Cons(2, Cons(3, Nil())))
print(list_sum(my_list))  # 6
doubled = list_map(lambda x: x * 2, my_list)
print(list_sum(doubled))  # 12

# Binary tree as algebraic data type
@dataclass
class Empty:
    pass

@dataclass
class Node(Generic[A]):
    value: A
    left: 'Tree[A]'
    right: 'Tree[A]'

Tree = Union[Empty, Node[A]]

def tree_sum(tree: Tree[int]) -> int:
    if isinstance(tree, Empty):
        return 0
    return tree.value + tree_sum(tree.left) + tree_sum(tree.right)

def tree_map(func: Callable[[A], B], tree: Tree[A]) -> Tree[B]:
    if isinstance(tree, Empty):
        return Empty()
    return Node(
        func(tree.value),
        tree_map(func, tree.left),
        tree_map(func, tree.right)
    )

6. Applicative Functors

from typing import TypeVar, Generic, Callable, List

A = TypeVar('A')
B = TypeVar('B')

class Applicative(Generic[A]):
    """
    Applicative functor - functor with ability to apply wrapped functions.

    Key operation: apply (or <*> in Haskell)
    Apply a wrapped function to a wrapped value.
    """

    @staticmethod
    def pure(value: A) -> 'Applicative[A]':
        """Wrap a value"""
        raise NotImplementedError

    def apply(self, func_wrapped: 'Applicative[Callable[[A], B]]') -> 'Applicative[B]':
        """Apply wrapped function to self"""
        raise NotImplementedError

class MaybeAp(Generic[A]):
    """Maybe as an applicative functor"""

    def __init__(self, value: Optional[A], is_some: bool):
        self._value = value
        self._is_some = is_some

    @staticmethod
    def pure(value: A) -> 'MaybeAp[A]':
        return MaybeAp(value, True)

    @staticmethod
    def some(value: A) -> 'MaybeAp[A]':
        return MaybeAp(value, True)

    @staticmethod
    def none() -> 'MaybeAp[A]':
        return MaybeAp(None, False)

    def map(self, func: Callable[[A], B]) -> 'MaybeAp[B]':
        if self._is_some:
            return MaybeAp.some(func(self._value))
        return MaybeAp.none()

    def apply(self, func_maybe: 'MaybeAp[Callable[[A], B]]') -> 'MaybeAp[B]':
        """Apply function in Maybe to value in Maybe"""
        if func_maybe._is_some and self._is_some:
            return MaybeAp.some(func_maybe._value(self._value))
        return MaybeAp.none()

    def __repr__(self):
        return f"Some({self._value})" if self._is_some else "None"

# Usage: Apply function to multiple Maybe values
def add(a: int) -> Callable[[int], int]:
    """Curried add"""
    return lambda b: a + b

# Lift add into Maybe context
maybe_a = MaybeAp.some(5)
maybe_b = MaybeAp.some(3)

# Apply curried function
result = maybe_b.apply(maybe_a.map(add))
print(result)  # Some(8)

# If either is None, result is None
maybe_c = MaybeAp.none()
result2 = maybe_c.apply(maybe_a.map(add))
print(result2)  # None

# Validation example: combine multiple validations
def lift_a2(
    func: Callable[[A, B], 'C'],
    fa: MaybeAp[A],
    fb: MaybeAp[B]
) -> MaybeAp['C']:
    """Lift a binary function into applicative context"""
    curried = lambda a: lambda b: func(a, b)
    return fb.apply(fa.map(curried))

def create_person(name: str, age: int) -> dict:
    return {'name': name, 'age': age}

valid_name = MaybeAp.some("Alice")
valid_age = MaybeAp.some(30)
invalid_age = MaybeAp.none()

person = lift_a2(create_person, valid_name, valid_age)
print(person)  # Some({'name': 'Alice', 'age': 30})

person2 = lift_a2(create_person, valid_name, invalid_age)
print(person2)  # None

7. Practical Patterns

Railway-Oriented Programming

from typing import Callable, TypeVar, List
from dataclasses import dataclass

A = TypeVar('A')
B = TypeVar('B')

@dataclass
class Success(Generic[A]):
    value: A

@dataclass
class Failure:
    errors: List[str]

Result = Union[Success[A], Failure]

def bind(result: Result[A], func: Callable[[A], Result[B]]) -> Result[B]:
    """Continue on success, short-circuit on failure"""
    if isinstance(result, Success):
        return func(result.value)
    return result

def map_result(result: Result[A], func: Callable[[A], B]) -> Result[B]:
    """Transform success value"""
    if isinstance(result, Success):
        return Success(func(result.value))
    return result

# Validation functions that return Result
def validate_email(email: str) -> Result[str]:
    if '@' in email:
        return Success(email)
    return Failure(["Invalid email format"])

def validate_age(age: int) -> Result[int]:
    if 0 <= age <= 150:
        return Success(age)
    return Failure(["Age must be between 0 and 150"])

def validate_name(name: str) -> Result[str]:
    if len(name) >= 2:
        return Success(name)
    return Failure(["Name must be at least 2 characters"])

# Railway: chain validations
def validate_user(name: str, age: int, email: str) -> Result[dict]:
    return bind(
        validate_name(name),
        lambda valid_name: bind(
            validate_age(age),
            lambda valid_age: bind(
                validate_email(email),
                lambda valid_email: Success({
                    'name': valid_name,
                    'age': valid_age,
                    'email': valid_email
                })
            )
        )
    )

print(validate_user("Alice", 30, "alice@example.com"))
# Success(value={'name': 'Alice', 'age': 30, 'email': 'alice@example.com'})

print(validate_user("A", 30, "invalid"))
# Failure(errors=['Name must be at least 2 characters'])

Free Monad Pattern (Simplified)

from typing import Generic, TypeVar, Callable, Union
from dataclasses import dataclass
from abc import ABC, abstractmethod

A = TypeVar('A')

# Define operations as data
@dataclass
class ReadFile:
    path: str
    next: Callable[[str], 'Free']

@dataclass
class WriteFile:
    path: str
    content: str
    next: Callable[[], 'Free']

@dataclass
class Pure(Generic[A]):
    value: A

Free = Union[ReadFile, WriteFile, Pure[A]]

# Build program as data structure
def read_file(path: str) -> Free:
    return ReadFile(path, lambda content: Pure(content))

def write_file(path: str, content: str) -> Free:
    return WriteFile(path, content, lambda: Pure(None))

# Interpreter that actually executes
def run_io(program: Free):
    """Execute the program"""
    if isinstance(program, Pure):
        return program.value
    elif isinstance(program, ReadFile):
        content = open(program.path).read()  # Actual IO
        return run_io(program.next(content))
    elif isinstance(program, WriteFile):
        open(program.path, 'w').write(program.content)  # Actual IO
        return run_io(program.next())

# Test interpreter (no actual IO)
def run_test(program: Free, mock_files: dict):
    """Execute program with mock files"""
    if isinstance(program, Pure):
        return program.value
    elif isinstance(program, ReadFile):
        content = mock_files.get(program.path, "")
        return run_test(program.next(content), mock_files)
    elif isinstance(program, WriteFile):
        mock_files[program.path] = program.content
        return run_test(program.next(), mock_files)

Exercises

Basic

  1. Implement a map function for the Either monad that transforms the Right value.

  2. Create a safe_head function that returns Maybe[A] instead of raising an exception.

  3. Define a binary tree ADT and implement tree_fold (catamorphism).

Intermediate

  1. Implement traverse for lists: turn List[Maybe[A]] into Maybe[List[A]].

  2. Create a Validated type that accumulates all errors instead of short-circuiting.

  3. Implement a state monad for threading state through computations.

Advanced

  1. Implement a parser combinator library using monads.

  2. Design a free monad for database operations with multiple interpreters.

  3. Create a monad transformer stack combining Reader, Either, and IO effects.


Summary

  • Functors allow mapping over containers (map)
  • Monads enable sequencing effectful computations (flat_map)
  • Maybe/Option handles absence, Either/Result handles errors
  • Algebraic data types model data with sum and product types
  • Applicatives allow applying wrapped functions to wrapped values
  • These patterns lead to composable, testable, type-safe code

Module Complete

This completes the Functional Programming module! You've learned core concepts for writing declarative, composable programs.

← Previous: Higher-Order Functions | Back to Course Index