Concurrent Patterns

Introduction

Concurrent programming patterns help structure parallel code for correctness, efficiency, and maintainability. This reading covers established patterns for common parallel computing problems.

Learning Objectives

By the end of this reading, you will be able to:

  • Implement producer-consumer patterns
  • Use thread pools and executors
  • Apply the actor model for concurrency
  • Implement futures and promises
  • Design work-stealing schedulers

1. Producer-Consumer Pattern

Basic Implementation

import threading
import queue
import time
import random

class ProducerConsumer:
    """Classic producer-consumer with bounded buffer"""

    def __init__(self, buffer_size=10):
        self.buffer = queue.Queue(maxsize=buffer_size)
        self.shutdown = threading.Event()

    def producer(self, producer_id, num_items):
        """Produce items and put in buffer"""
        for i in range(num_items):
            if self.shutdown.is_set():
                break

            item = f"item-{producer_id}-{i}"
            self.buffer.put(item)  # Blocks if full
            print(f"Producer {producer_id}: produced {item}")
            time.sleep(random.uniform(0.01, 0.1))

        print(f"Producer {producer_id}: done")

    def consumer(self, consumer_id):
        """Consume items from buffer"""
        while not self.shutdown.is_set():
            try:
                item = self.buffer.get(timeout=1)
                print(f"Consumer {consumer_id}: consumed {item}")
                self.buffer.task_done()
                time.sleep(random.uniform(0.05, 0.15))
            except queue.Empty:
                continue

        print(f"Consumer {consumer_id}: shutting down")

    def run(self, num_producers=2, num_consumers=3, items_per_producer=10):
        """Run the producer-consumer system"""
        threads = []

        # Start consumers
        for i in range(num_consumers):
            t = threading.Thread(target=self.consumer, args=(i,))
            t.start()
            threads.append(t)

        # Start producers
        producer_threads = []
        for i in range(num_producers):
            t = threading.Thread(
                target=self.producer,
                args=(i, items_per_producer)
            )
            t.start()
            producer_threads.append(t)

        # Wait for producers
        for t in producer_threads:
            t.join()

        # Wait for buffer to empty
        self.buffer.join()

        # Signal shutdown
        self.shutdown.set()
        for t in threads:
            t.join()

Multiple Queues (Pipeline)

class Pipeline:
    """Multi-stage processing pipeline"""

    def __init__(self):
        self.stage1_queue = queue.Queue()
        self.stage2_queue = queue.Queue()
        self.stage3_queue = queue.Queue()
        self.results = []
        self.shutdown = threading.Event()

    def stage1_worker(self):
        """First stage: Parse input"""
        while not self.shutdown.is_set():
            try:
                raw_data = self.stage1_queue.get(timeout=1)
                parsed = f"parsed({raw_data})"
                self.stage2_queue.put(parsed)
                self.stage1_queue.task_done()
            except queue.Empty:
                continue

    def stage2_worker(self):
        """Second stage: Transform data"""
        while not self.shutdown.is_set():
            try:
                parsed = self.stage2_queue.get(timeout=1)
                transformed = f"transformed({parsed})"
                self.stage3_queue.put(transformed)
                self.stage2_queue.task_done()
            except queue.Empty:
                continue

    def stage3_worker(self):
        """Third stage: Store results"""
        while not self.shutdown.is_set():
            try:
                transformed = self.stage3_queue.get(timeout=1)
                result = f"stored({transformed})"
                self.results.append(result)
                self.stage3_queue.task_done()
            except queue.Empty:
                continue

    def process(self, items):
        """Process items through pipeline"""
        # Start workers
        workers = [
            threading.Thread(target=self.stage1_worker),
            threading.Thread(target=self.stage2_worker),
            threading.Thread(target=self.stage3_worker)
        ]
        for w in workers:
            w.start()

        # Feed input
        for item in items:
            self.stage1_queue.put(item)

        # Wait for completion
        self.stage1_queue.join()
        self.stage2_queue.join()
        self.stage3_queue.join()

        # Shutdown
        self.shutdown.set()
        for w in workers:
            w.join()

        return self.results

2. Thread Pool Pattern

Basic Thread Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import Callable, List, Any

class SimpleThreadPool:
    """Simple thread pool implementation"""

    def __init__(self, num_workers: int):
        self.num_workers = num_workers
        self.task_queue = queue.Queue()
        self.workers = []
        self.shutdown_flag = threading.Event()
        self._start_workers()

    def _start_workers(self):
        """Start worker threads"""
        for i in range(self.num_workers):
            t = threading.Thread(target=self._worker, name=f"Worker-{i}")
            t.daemon = True
            t.start()
            self.workers.append(t)

    def _worker(self):
        """Worker thread main loop"""
        while not self.shutdown_flag.is_set():
            try:
                task, args, kwargs, result_holder = self.task_queue.get(timeout=1)
                try:
                    result = task(*args, **kwargs)
                    result_holder['result'] = result
                except Exception as e:
                    result_holder['error'] = e
                finally:
                    result_holder['done'].set()
                    self.task_queue.task_done()
            except queue.Empty:
                continue

    def submit(self, task: Callable, *args, **kwargs) -> dict:
        """Submit task and return future-like object"""
        result_holder = {
            'result': None,
            'error': None,
            'done': threading.Event()
        }
        self.task_queue.put((task, args, kwargs, result_holder))
        return result_holder

    def map(self, func: Callable, items: List) -> List:
        """Map function over items"""
        futures = [self.submit(func, item) for item in items]

        results = []
        for f in futures:
            f['done'].wait()
            if f['error']:
                raise f['error']
            results.append(f['result'])
        return results

    def shutdown(self, wait=True):
        """Shutdown the pool"""
        self.shutdown_flag.set()
        if wait:
            for w in self.workers:
                w.join()

# Using Python's built-in ThreadPoolExecutor
def executor_example():
    """Standard library thread pool"""

    def process_item(item):
        time.sleep(0.1)
        return item * 2

    items = list(range(20))

    # Method 1: map (preserves order)
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_item, items))
        print(f"Map results: {results}")

    # Method 2: submit (manual future handling)
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_item, item) for item in items]

        # as_completed - process results as they finish
        for future in as_completed(futures):
            result = future.result()
            print(f"Completed: {result}")

Work Stealing

import random
from collections import deque
from typing import Optional, Callable

class WorkStealingPool:
    """Thread pool with work stealing for load balancing"""

    def __init__(self, num_workers: int):
        self.num_workers = num_workers
        # Each worker has its own deque
        self.queues = [deque() for _ in range(num_workers)]
        self.workers = []
        self.shutdown_flag = threading.Event()
        self.lock = threading.Lock()
        self._start_workers()

    def _start_workers(self):
        for i in range(self.num_workers):
            t = threading.Thread(
                target=self._worker,
                args=(i,),
                name=f"Worker-{i}"
            )
            t.daemon = True
            t.start()
            self.workers.append(t)

    def _worker(self, worker_id: int):
        """Worker with work stealing"""
        my_queue = self.queues[worker_id]

        while not self.shutdown_flag.is_set():
            task = self._get_task(worker_id)
            if task:
                try:
                    task()
                except Exception as e:
                    print(f"Worker {worker_id} error: {e}")
            else:
                time.sleep(0.001)  # Brief sleep if no work

    def _get_task(self, worker_id: int) -> Optional[Callable]:
        """Get task from own queue or steal from others"""
        my_queue = self.queues[worker_id]

        # Try own queue first (LIFO for locality)
        try:
            return my_queue.pop()
        except IndexError:
            pass

        # Try to steal from another worker (FIFO for fairness)
        for i in range(self.num_workers):
            if i == worker_id:
                continue
            try:
                return self.queues[i].popleft()  # Steal from front
            except IndexError:
                continue

        return None

    def submit(self, task: Callable):
        """Submit task to random queue"""
        worker_id = random.randint(0, self.num_workers - 1)
        self.queues[worker_id].append(task)

    def submit_to(self, worker_id: int, task: Callable):
        """Submit task to specific worker (for locality)"""
        self.queues[worker_id].append(task)

3. Futures and Promises

Future Implementation

from typing import TypeVar, Generic, Optional, Callable
import threading

T = TypeVar('T')

class Future(Generic[T]):
    """Represents a value that will be available later"""

    def __init__(self):
        self._result: Optional[T] = None
        self._exception: Optional[Exception] = None
        self._done = threading.Event()
        self._callbacks: List[Callable] = []
        self._lock = threading.Lock()

    def set_result(self, result: T):
        """Set the result (called by executor)"""
        with self._lock:
            self._result = result
            self._done.set()
            callbacks = self._callbacks.copy()

        for callback in callbacks:
            try:
                callback(self)
            except Exception:
                pass

    def set_exception(self, exc: Exception):
        """Set an exception (called by executor)"""
        with self._lock:
            self._exception = exc
            self._done.set()
            callbacks = self._callbacks.copy()

        for callback in callbacks:
            try:
                callback(self)
            except Exception:
                pass

    def result(self, timeout: Optional[float] = None) -> T:
        """Get the result, blocking if necessary"""
        if not self._done.wait(timeout):
            raise TimeoutError("Future timed out")

        if self._exception:
            raise self._exception
        return self._result

    def done(self) -> bool:
        """Check if future is done"""
        return self._done.is_set()

    def add_done_callback(self, callback: Callable):
        """Add callback to run when done"""
        with self._lock:
            if self._done.is_set():
                callback(self)
            else:
                self._callbacks.append(callback)

class Promise(Generic[T]):
    """Producer side of Future"""

    def __init__(self):
        self.future = Future[T]()

    def set_result(self, result: T):
        self.future.set_result(result)

    def set_exception(self, exc: Exception):
        self.future.set_exception(exc)

    def get_future(self) -> Future[T]:
        return self.future

# Usage example
def async_computation():
    """Demonstrate future/promise"""
    promise = Promise[int]()
    future = promise.get_future()

    def compute():
        time.sleep(1)
        promise.set_result(42)

    thread = threading.Thread(target=compute)
    thread.start()

    # Add callback
    future.add_done_callback(lambda f: print(f"Callback: {f.result()}"))

    # Can do other work here...
    print("Waiting for result...")

    # Block for result
    result = future.result()
    print(f"Got result: {result}")

    thread.join()

Chaining Futures

class ChainableFuture(Future[T]):
    """Future with then/catch chaining"""

    def then(self, fn: Callable[[T], 'U']) -> 'ChainableFuture[U]':
        """Chain a transformation"""
        result_future = ChainableFuture()

        def callback(completed_future):
            try:
                value = completed_future.result()
                new_value = fn(value)
                result_future.set_result(new_value)
            except Exception as e:
                result_future.set_exception(e)

        self.add_done_callback(callback)
        return result_future

    def catch(self, fn: Callable[[Exception], T]) -> 'ChainableFuture[T]':
        """Handle exceptions"""
        result_future = ChainableFuture()

        def callback(completed_future):
            try:
                value = completed_future.result()
                result_future.set_result(value)
            except Exception as e:
                try:
                    recovered = fn(e)
                    result_future.set_result(recovered)
                except Exception as e2:
                    result_future.set_exception(e2)

        self.add_done_callback(callback)
        return result_future

def future_chaining_example():
    """Chain operations on futures"""

    def fetch_user(user_id):
        promise = Promise()
        def work():
            time.sleep(0.5)
            promise.set_result({"id": user_id, "name": "Alice"})
        threading.Thread(target=work).start()

        future = ChainableFuture()
        promise.future.add_done_callback(
            lambda f: future.set_result(f.result()) if not f._exception
            else future.set_exception(f._exception)
        )
        return future

    # Chain transformations
    result = (
        fetch_user(123)
        .then(lambda user: user['name'])
        .then(lambda name: name.upper())
        .catch(lambda e: "UNKNOWN")
    )

    print(result.result())  # "ALICE"

4. Actor Model

Basic Actor Implementation

import queue
import threading
from typing import Any, Dict
from abc import ABC, abstractmethod

class Message:
    """Message sent between actors"""
    def __init__(self, content: Any, sender: 'Actor' = None):
        self.content = content
        self.sender = sender

class Actor(ABC):
    """Base actor class"""

    def __init__(self, name: str):
        self.name = name
        self.mailbox = queue.Queue()
        self._running = True
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def _run(self):
        """Actor main loop"""
        while self._running:
            try:
                message = self.mailbox.get(timeout=1)
                self.receive(message)
            except queue.Empty:
                continue

    @abstractmethod
    def receive(self, message: Message):
        """Handle incoming message - implement in subclass"""
        pass

    def send(self, target: 'Actor', content: Any):
        """Send message to another actor"""
        message = Message(content, sender=self)
        target.mailbox.put(message)

    def stop(self):
        """Stop the actor"""
        self._running = False
        self._thread.join()

# Example actors
class PrinterActor(Actor):
    """Actor that prints messages"""

    def receive(self, message: Message):
        print(f"[{self.name}] Received: {message.content}")
        if message.sender:
            # Reply to sender
            self.send(message.sender, f"ACK: {message.content}")

class CounterActor(Actor):
    """Actor that counts"""

    def __init__(self, name: str):
        super().__init__(name)
        self.count = 0

    def receive(self, message: Message):
        if message.content == "increment":
            self.count += 1
        elif message.content == "decrement":
            self.count -= 1
        elif message.content == "get":
            if message.sender:
                self.send(message.sender, self.count)

class CollectorActor(Actor):
    """Actor that collects results"""

    def __init__(self, name: str, expected: int):
        super().__init__(name)
        self.results = []
        self.expected = expected
        self.done = threading.Event()

    def receive(self, message: Message):
        self.results.append(message.content)
        if len(self.results) >= self.expected:
            self.done.set()

def actor_example():
    """Demonstrate actor model"""
    printer = PrinterActor("Printer")
    counter = CounterActor("Counter")
    collector = CollectorActor("Collector", 1)

    # Send messages
    collector.send(counter, "increment")
    collector.send(counter, "increment")
    collector.send(counter, "increment")
    counter.send(printer, "Working...")

    # Get count
    collector.send(counter, "get")
    collector.done.wait(timeout=2)
    print(f"Final count: {collector.results}")

    # Cleanup
    printer.stop()
    counter.stop()
    collector.stop()

Actor System

class ActorSystem:
    """Manages actors"""

    def __init__(self):
        self.actors: Dict[str, Actor] = {}
        self.lock = threading.Lock()

    def create_actor(self, actor_class, name: str, *args, **kwargs) -> Actor:
        """Create and register an actor"""
        with self.lock:
            if name in self.actors:
                raise ValueError(f"Actor {name} already exists")
            actor = actor_class(name, *args, **kwargs)
            self.actors[name] = actor
            return actor

    def get_actor(self, name: str) -> Actor:
        """Get actor by name"""
        return self.actors.get(name)

    def send(self, target_name: str, content: Any, sender: Actor = None):
        """Send message to named actor"""
        target = self.actors.get(target_name)
        if target:
            target.mailbox.put(Message(content, sender))

    def shutdown(self):
        """Stop all actors"""
        for actor in self.actors.values():
            actor.stop()
        self.actors.clear()

5. Parallel Patterns

Map-Reduce

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import reduce
from typing import List, Callable, TypeVar, Iterable
import itertools

T = TypeVar('T')
U = TypeVar('U')

class MapReduce:
    """Parallel map-reduce implementation"""

    def __init__(self, num_workers: int = 4, use_processes: bool = True):
        self.num_workers = num_workers
        self.executor_class = ProcessPoolExecutor if use_processes else ThreadPoolExecutor

    def map_reduce(
        self,
        data: List[T],
        mapper: Callable[[T], List[tuple]],
        reducer: Callable[[str, List], U]
    ) -> Dict[str, U]:
        """
        Execute map-reduce.

        mapper: T -> [(key, value), ...]
        reducer: (key, [values]) -> result
        """
        # Map phase (parallel)
        with self.executor_class(max_workers=self.num_workers) as executor:
            mapped = list(executor.map(mapper, data))

        # Flatten and shuffle
        all_pairs = list(itertools.chain.from_iterable(mapped))

        # Group by key
        grouped = {}
        for key, value in all_pairs:
            if key not in grouped:
                grouped[key] = []
            grouped[key].append(value)

        # Reduce phase (parallel)
        def reduce_item(item):
            key, values = item
            return key, reducer(key, values)

        with self.executor_class(max_workers=self.num_workers) as executor:
            reduced = dict(executor.map(reduce_item, grouped.items()))

        return reduced

# Example: Word count
def word_count_mapper(document: str) -> List[tuple]:
    """Map document to (word, 1) pairs"""
    words = document.lower().split()
    return [(word, 1) for word in words]

def word_count_reducer(word: str, counts: List[int]) -> int:
    """Sum all counts for a word"""
    return sum(counts)

def word_count_example():
    documents = [
        "hello world hello",
        "world is beautiful",
        "hello beautiful world",
        "python is beautiful"
    ]

    mr = MapReduce(num_workers=2, use_processes=False)
    result = mr.map_reduce(documents, word_count_mapper, word_count_reducer)
    print(result)
    # {'hello': 3, 'world': 3, 'is': 2, 'beautiful': 3, 'python': 1}

Scatter-Gather

class ScatterGather:
    """Scatter work to multiple workers, gather results"""

    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers

    def execute(
        self,
        data: List[T],
        worker_fn: Callable[[List[T]], U],
        gather_fn: Callable[[List[U]], Any]
    ) -> Any:
        """
        Scatter data to workers, gather and combine results.
        """
        # Scatter: Partition data
        chunk_size = (len(data) + self.num_workers - 1) // self.num_workers
        chunks = [
            data[i:i + chunk_size]
            for i in range(0, len(data), chunk_size)
        ]

        # Process in parallel
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            results = list(executor.map(worker_fn, chunks))

        # Gather: Combine results
        return gather_fn(results)

# Example: Parallel sum
def parallel_sum(numbers: List[int]) -> int:
    sg = ScatterGather(num_workers=4)

    def sum_chunk(chunk):
        return sum(chunk)

    def combine_sums(partial_sums):
        return sum(partial_sums)

    return sg.execute(numbers, sum_chunk, combine_sums)

Parallel Divide and Conquer

class ParallelDivideConquer:
    """Parallel divide and conquer framework"""

    def __init__(self, threshold: int = 1000, max_workers: int = 4):
        self.threshold = threshold
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    def solve(
        self,
        problem: T,
        is_base_case: Callable[[T], bool],
        solve_base: Callable[[T], U],
        divide: Callable[[T], List[T]],
        combine: Callable[[List[U]], U]
    ) -> U:
        """
        Solve problem using parallel divide and conquer.
        """
        if is_base_case(problem):
            return solve_base(problem)

        # Divide
        subproblems = divide(problem)

        # Conquer (parallel)
        futures = [
            self.executor.submit(
                self.solve, sub, is_base_case, solve_base, divide, combine
            )
            for sub in subproblems
        ]

        # Combine
        results = [f.result() for f in futures]
        return combine(results)

# Example: Parallel merge sort
def parallel_merge_sort(arr: List[int]) -> List[int]:
    pdc = ParallelDivideConquer(threshold=100, max_workers=4)

    def is_base(a):
        return len(a) <= 100

    def solve_base(a):
        return sorted(a)

    def divide(a):
        mid = len(a) // 2
        return [a[:mid], a[mid:]]

    def combine(sorted_halves):
        if len(sorted_halves) != 2:
            return sorted_halves[0] if sorted_halves else []

        left, right = sorted_halves
        result = []
        i = j = 0
        while i < len(left) and j < len(right):
            if left[i] <= right[j]:
                result.append(left[i])
                i += 1
            else:
                result.append(right[j])
                j += 1
        result.extend(left[i:])
        result.extend(right[j:])
        return result

    return pdc.solve(arr, is_base, solve_base, divide, combine)

6. Synchronization Patterns

Double-Checked Locking

class Singleton:
    """Thread-safe singleton using double-checked locking"""

    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:  # First check (no lock)
            with cls._lock:  # Acquire lock
                if cls._instance is None:  # Second check (with lock)
                    cls._instance = super().__new__(cls)
        return cls._instance

# Python-specific: Use __new__ or module-level
# Double-checked locking is tricky to get right!

Read-Copy-Update (RCU)

class RCUData:
    """Read-Copy-Update pattern for read-heavy workloads"""

    def __init__(self, initial_data: dict):
        self._data = initial_data.copy()
        self._lock = threading.Lock()

    def read(self) -> dict:
        """
        Read current data (lock-free).
        Returns a reference that remains valid.
        """
        return self._data  # Atomic read of reference

    def update(self, key: str, value: Any):
        """
        Update by copying, modifying, and replacing.
        """
        with self._lock:
            # Copy
            new_data = self._data.copy()
            # Update
            new_data[key] = value
            # Replace (atomic)
            self._data = new_data

    def bulk_update(self, updates: dict):
        """Bulk update"""
        with self._lock:
            new_data = self._data.copy()
            new_data.update(updates)
            self._data = new_data

Compare-and-Swap Pattern

import threading

class CASValue:
    """Compare-and-swap value (simulated)"""

    def __init__(self, initial: Any):
        self._value = initial
        self._lock = threading.Lock()

    def get(self) -> Any:
        return self._value

    def compare_and_swap(self, expected: Any, new_value: Any) -> bool:
        """
        Atomically: if value == expected, set to new_value.
        Returns whether the swap occurred.
        """
        with self._lock:
            if self._value == expected:
                self._value = new_value
                return True
            return False

class LockFreeCounter:
    """Counter using CAS"""

    def __init__(self):
        self._value = CASValue(0)

    def increment(self) -> int:
        """Increment and return new value"""
        while True:
            current = self._value.get()
            if self._value.compare_and_swap(current, current + 1):
                return current + 1
            # CAS failed, retry

    def get(self) -> int:
        return self._value.get()

Exercises

Basic

  1. Implement a producer-consumer with multiple producers and consumers.

  2. Create a thread pool that can be resized at runtime.

  3. Implement a simple future with timeout support.

Intermediate

  1. Build an actor system with supervision (restart failed actors).

  2. Implement a parallel map-reduce for computing average.

  3. Create a work-stealing scheduler and compare to static partitioning.

Advanced

  1. Implement a concurrent cache with read-copy-update semantics.

  2. Build a reactive streams processor with backpressure.

  3. Design a distributed actor system across multiple processes.


Summary

  • Producer-consumer decouples data generation from processing
  • Thread pools amortize thread creation cost
  • Work stealing balances load dynamically
  • Futures represent pending computations
  • Actors encapsulate state and communicate via messages
  • Map-reduce and scatter-gather parallelize data processing

Next Reading

GPU and Parallel Hardware →