Concurrent Patterns

Concurrent programming patterns help structure parallel code for correctness, efficiency, and maintainability. ## 1. Producer-Consumer Pattern

Basic Implementation

import threading
import queue
import time
import random

class ProducerConsumer:
    """Classic producer-consumer with bounded buffer"""

    def __init__(self, buffer_size=10):
        self.buffer = queue.Queue(maxsize=buffer_size)
        self.shutdown = threading.Event()

    def producer(self, producer_id, num_items):
        """Produce items and put in buffer"""
        for i in range(num_items):
            if self.shutdown.is_set():
                break

            item = f"item-{producer_id}-{i}"
            self.buffer.put(item)  # Blocks if full
            print(f"Producer {producer_id}: produced {item}")
            time.sleep(random.uniform(0.01, 0.1))

        print(f"Producer {producer_id}: done")

    def consumer(self, consumer_id):
        """Consume items from buffer"""
        while not self.shutdown.is_set():
            try:
                item = self.buffer.get(timeout=1)
                print(f"Consumer {consumer_id}: consumed {item}")
                self.buffer.task_done()
                time.sleep(random.uniform(0.05, 0.15))
            except queue.Empty:
                continue

        print(f"Consumer {consumer_id}: shutting down")

    def run(self, num_producers=2, num_consumers=3, items_per_producer=10):
        """Run the producer-consumer system"""
        threads = []

        # Start consumers
        for i in range(num_consumers):
            t = threading.Thread(target=self.consumer, args=(i,))
            t.start()
            threads.append(t)

        # Start producers
        producer_threads = []
        for i in range(num_producers):
            t = threading.Thread(
                target=self.producer,
                args=(i, items_per_producer)
            )
            t.start()
            producer_threads.append(t)

        # Wait for producers
        for t in producer_threads:
            t.join()

        # Wait for buffer to empty
        self.buffer.join()

        # Signal shutdown
        self.shutdown.set()
        for t in threads:
            t.join()

Multiple Queues (Pipeline)

class Pipeline:
    """Multi-stage processing pipeline"""

    def __init__(self):
        self.stage1_queue = queue.Queue()
        self.stage2_queue = queue.Queue()
        self.stage3_queue = queue.Queue()
        self.results = []
        self.shutdown = threading.Event()

    def stage1_worker(self):
        """First stage: Parse input"""
        while not self.shutdown.is_set():
            try:
                raw_data = self.stage1_queue.get(timeout=1)
                parsed = f"parsed({raw_data})"
                self.stage2_queue.put(parsed)
                self.stage1_queue.task_done()
            except queue.Empty:
                continue

    def stage2_worker(self):
        """Second stage: Transform data"""
        while not self.shutdown.is_set():
            try:
                parsed = self.stage2_queue.get(timeout=1)
                transformed = f"transformed({parsed})"
                self.stage3_queue.put(transformed)
                self.stage2_queue.task_done()
            except queue.Empty:
                continue

    def stage3_worker(self):
        """Third stage: Store results"""
        while not self.shutdown.is_set():
            try:
                transformed = self.stage3_queue.get(timeout=1)
                result = f"stored({transformed})"
                self.results.append(result)
                self.stage3_queue.task_done()
            except queue.Empty:
                continue

    def process(self, items):
        """Process items through pipeline"""
        # Start workers
        workers = [
            threading.Thread(target=self.stage1_worker),
            threading.Thread(target=self.stage2_worker),
            threading.Thread(target=self.stage3_worker)
        ]
        for w in workers:
            w.start()

        # Feed input
        for item in items:
            self.stage1_queue.put(item)

        # Wait for completion
        self.stage1_queue.join()
        self.stage2_queue.join()
        self.stage3_queue.join()

        # Shutdown
        self.shutdown.set()
        for w in workers:
            w.join()

        return self.results

2. Thread Pool Pattern

Basic Thread Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import Callable, List, Any

class SimpleThreadPool:
    """Simple thread pool implementation"""

    def __init__(self, num_workers: int):
        self.num_workers = num_workers
        self.task_queue = queue.Queue()
        self.workers = []
        self.shutdown_flag = threading.Event()
        self._start_workers()

    def _start_workers(self):
        """Start worker threads"""
        for i in range(self.num_workers):
            t = threading.Thread(target=self._worker, name=f"Worker-{i}")
            t.daemon = True
            t.start()
            self.workers.append(t)

    def _worker(self):
        """Worker thread main loop"""
        while not self.shutdown_flag.is_set():
            try:
                task, args, kwargs, result_holder = self.task_queue.get(timeout=1)
                try:
                    result = task(*args, **kwargs)
                    result_holder['result'] = result
                except Exception as e:
                    result_holder['error'] = e
                finally:
                    result_holder['done'].set()
                    self.task_queue.task_done()
            except queue.Empty:
                continue

    def submit(self, task: Callable, *args, **kwargs) -> dict:
        """Submit task and return future-like object"""
        result_holder = {
            'result': None,
            'error': None,
            'done': threading.Event()
        }
        self.task_queue.put((task, args, kwargs, result_holder))
        return result_holder

    def map(self, func: Callable, items: List) -> List:
        """Map function over items"""
        futures = [self.submit(func, item) for item in items]

        results = []
        for f in futures:
            f['done'].wait()
            if f['error']:
                raise f['error']
            results.append(f['result'])
        return results

    def shutdown(self, wait=True):
        """Shutdown the pool"""
        self.shutdown_flag.set()
        if wait:
            for w in self.workers:
                w.join()

# Using Python's built-in ThreadPoolExecutor
def executor_example():
    """Standard library thread pool"""

    def process_item(item):
        time.sleep(0.1)
        return item * 2

    items = list(range(20))

    # Method 1: map (preserves order)
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_item, items))
        print(f"Map results: {results}")

    # Method 2: submit (manual future handling)
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_item, item) for item in items]

        # as_completed - process results as they finish
        for future in as_completed(futures):
            result = future.result()
            print(f"Completed: {result}")

Work Stealing

import random
from collections import deque
from typing import Optional, Callable

class WorkStealingPool:
    """Thread pool with work stealing for load balancing"""

    def __init__(self, num_workers: int):
        self.num_workers = num_workers
        # Each worker has its own deque
        self.queues = [deque() for _ in range(num_workers)]
        self.workers = []
        self.shutdown_flag = threading.Event()
        self.lock = threading.Lock()
        self._start_workers()

    def _start_workers(self):
        for i in range(self.num_workers):
            t = threading.Thread(
                target=self._worker,
                args=(i,),
                name=f"Worker-{i}"
            )
            t.daemon = True
            t.start()
            self.workers.append(t)

    def _worker(self, worker_id: int):
        """Worker with work stealing"""
        my_queue = self.queues[worker_id]

        while not self.shutdown_flag.is_set():
            task = self._get_task(worker_id)
            if task:
                try:
                    task()
                except Exception as e:
                    print(f"Worker {worker_id} error: {e}")
            else:
                time.sleep(0.001)  # Brief sleep if no work

    def _get_task(self, worker_id: int) -> Optional[Callable]:
        """Get task from own queue or steal from others"""
        my_queue = self.queues[worker_id]

        # Try own queue first (LIFO for locality)
        try:
            return my_queue.pop()
        except IndexError:
            pass

        # Try to steal from another worker (FIFO for fairness)
        for i in range(self.num_workers):
            if i == worker_id:
                continue
            try:
                return self.queues[i].popleft()  # Steal from front
            except IndexError:
                continue

        return None

    def submit(self, task: Callable):
        """Submit task to random queue"""
        worker_id = random.randint(0, self.num_workers - 1)
        self.queues[worker_id].append(task)

    def submit_to(self, worker_id: int, task: Callable):
        """Submit task to specific worker (for locality)"""
        self.queues[worker_id].append(task)

3. Futures and Promises

Future Implementation

from typing import TypeVar, Generic, Optional, Callable
import threading

T = TypeVar('T')

class Future(Generic[T]):
    """Represents a value that will be available later"""

    def __init__(self):
        self._result: Optional[T] = None
        self._exception: Optional[Exception] = None
        self._done = threading.Event()
        self._callbacks: List[Callable] = []
        self._lock = threading.Lock()

    def set_result(self, result: T):
        """Set the result (called by executor)"""
        with self._lock:
            self._result = result
            self._done.set()
            callbacks = self._callbacks.copy()

        for callback in callbacks:
            try:
                callback(self)
            except Exception:
                pass

    def set_exception(self, exc: Exception):
        """Set an exception (called by executor)"""
        with self._lock:
            self._exception = exc
            self._done.set()
            callbacks = self._callbacks.copy()

        for callback in callbacks:
            try:
                callback(self)
            except Exception:
                pass

    def result(self, timeout: Optional[float] = None) -> T:
        """Get the result, blocking if necessary"""
        if not self._done.wait(timeout):
            raise TimeoutError("Future timed out")

        if self._exception:
            raise self._exception
        return self._result

    def done(self) -> bool:
        """Check if future is done"""
        return self._done.is_set()

    def add_done_callback(self, callback: Callable):
        """Add callback to run when done"""
        with self._lock:
            if self._done.is_set():
                callback(self)
            else:
                self._callbacks.append(callback)

class Promise(Generic[T]):
    """Producer side of Future"""

    def __init__(self):
        self.future = Future[T]()

    def set_result(self, result: T):
        self.future.set_result(result)

    def set_exception(self, exc: Exception):
        self.future.set_exception(exc)

    def get_future(self) -> Future[T]:
        return self.future

# Usage example
def async_computation():
    """Demonstrate future/promise"""
    promise = Promise[int]()
    future = promise.get_future()

    def compute():
        time.sleep(1)
        promise.set_result(42)

    thread = threading.Thread(target=compute)
    thread.start()

    # Add callback
    future.add_done_callback(lambda f: print(f"Callback: {f.result()}"))

    # Can do other work here...
    print("Waiting for result...")

    # Block for result
    result = future.result()
    print(f"Got result: {result}")

    thread.join()

Chaining Futures

class ChainableFuture(Future[T]):
    """Future with then/catch chaining"""

    def then(self, fn: Callable[[T], 'U']) -> 'ChainableFuture[U]':
        """Chain a transformation"""
        result_future = ChainableFuture()

        def callback(completed_future):
            try:
                value = completed_future.result()
                new_value = fn(value)
                result_future.set_result(new_value)
            except Exception as e:
                result_future.set_exception(e)

        self.add_done_callback(callback)
        return result_future

    def catch(self, fn: Callable[[Exception], T]) -> 'ChainableFuture[T]':
        """Handle exceptions"""
        result_future = ChainableFuture()

        def callback(completed_future):
            try:
                value = completed_future.result()
                result_future.set_result(value)
            except Exception as e:
                try:
                    recovered = fn(e)
                    result_future.set_result(recovered)
                except Exception as e2:
                    result_future.set_exception(e2)

        self.add_done_callback(callback)
        return result_future

def future_chaining_example():
    """Chain operations on futures"""

    def fetch_user(user_id):
        promise = Promise()
        def work():
            time.sleep(0.5)
            promise.set_result({"id": user_id, "name": "Alice"})
        threading.Thread(target=work).start()

        future = ChainableFuture()
        promise.future.add_done_callback(
            lambda f: future.set_result(f.result()) if not f._exception
            else future.set_exception(f._exception)
        )
        return future

    # Chain transformations
    result = (
        fetch_user(123)
        .then(lambda user: user['name'])
        .then(lambda name: name.upper())
        .catch(lambda e: "UNKNOWN")
    )

    print(result.result())  # "ALICE"

4. Actor Model

Basic Actor Implementation

import queue
import threading
from typing import Any, Dict
from abc import ABC, abstractmethod

class Message:
    """Message sent between actors"""
    def __init__(self, content: Any, sender: 'Actor' = None):
        self.content = content
        self.sender = sender

class Actor(ABC):
    """Base actor class"""

    def __init__(self, name: str):
        self.name = name
        self.mailbox = queue.Queue()
        self._running = True
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def _run(self):
        """Actor main loop"""
        while self._running:
            try:
                message = self.mailbox.get(timeout=1)
                self.receive(message)
            except queue.Empty:
                continue

    @abstractmethod
    def receive(self, message: Message):
        """Handle incoming message - implement in subclass"""
        pass

    def send(self, target: 'Actor', content: Any):
        """Send message to another actor"""
        message = Message(content, sender=self)
        target.mailbox.put(message)

    def stop(self):
        """Stop the actor"""
        self._running = False
        self._thread.join()

# Example actors
class PrinterActor(Actor):
    """Actor that prints messages"""

    def receive(self, message: Message):
        print(f"[{self.name}] Received: {message.content}")
        if message.sender:
            # Reply to sender
            self.send(message.sender, f"ACK: {message.content}")

class CounterActor(Actor):
    """Actor that counts"""

    def __init__(self, name: str):
        super().__init__(name)
        self.count = 0

    def receive(self, message: Message):
        if message.content == "increment":
            self.count += 1
        elif message.content == "decrement":
            self.count -= 1
        elif message.content == "get":
            if message.sender:
                self.send(message.sender, self.count)

class CollectorActor(Actor):
    """Actor that collects results"""

    def __init__(self, name: str, expected: int):
        super().__init__(name)
        self.results = []
        self.expected = expected
        self.done = threading.Event()

    def receive(self, message: Message):
        self.results.append(message.content)
        if len(self.results) >= self.expected:
            self.done.set()

def actor_example():
    """Demonstrate actor model"""
    printer = PrinterActor("Printer")
    counter = CounterActor("Counter")
    collector = CollectorActor("Collector", 1)

    # Send messages
    collector.send(counter, "increment")
    collector.send(counter, "increment")
    collector.send(counter, "increment")
    counter.send(printer, "Working...")

    # Get count
    collector.send(counter, "get")
    collector.done.wait(timeout=2)
    print(f"Final count: {collector.results}")

    # Cleanup
    printer.stop()
    counter.stop()
    collector.stop()

Actor System

class ActorSystem:
    """Manages actors"""

    def __init__(self):
        self.actors: Dict[str, Actor] = {}
        self.lock = threading.Lock()

    def create_actor(self, actor_class, name: str, *args, **kwargs) -> Actor:
        """Create and register an actor"""
        with self.lock:
            if name in self.actors:
                raise ValueError(f"Actor {name} already exists")
            actor = actor_class(name, *args, **kwargs)
            self.actors[name] = actor
            return actor

    def get_actor(self, name: str) -> Actor:
        """Get actor by name"""
        return self.actors.get(name)

    def send(self, target_name: str, content: Any, sender: Actor = None):
        """Send message to named actor"""
        target = self.actors.get(target_name)
        if target:
            target.mailbox.put(Message(content, sender))

    def shutdown(self):
        """Stop all actors"""
        for actor in self.actors.values():
            actor.stop()
        self.actors.clear()

5. Parallel Patterns

Map-Reduce

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import reduce
from typing import List, Callable, TypeVar, Iterable
import itertools

T = TypeVar('T')
U = TypeVar('U')

class MapReduce:
    """Parallel map-reduce implementation"""

    def __init__(self, num_workers: int = 4, use_processes: bool = True):
        self.num_workers = num_workers
        self.executor_class = ProcessPoolExecutor if use_processes else ThreadPoolExecutor

    def map_reduce(
        self,
        data: List[T],
        mapper: Callable[[T], List[tuple]],
        reducer: Callable[[str, List], U]
    ) -> Dict[str, U]:
        """
        Execute map-reduce.

        mapper: T -> [(key, value), ...]
        reducer: (key, [values]) -> result
        """
        # Map phase (parallel)
        with self.executor_class(max_workers=self.num_workers) as executor:
            mapped = list(executor.map(mapper, data))

        # Flatten and shuffle
        all_pairs = list(itertools.chain.from_iterable(mapped))

        # Group by key
        grouped = {}
        for key, value in all_pairs:
            if key not in grouped:
                grouped[key] = []
            grouped[key].append(value)

        # Reduce phase (parallel)
        def reduce_item(item):
            key, values = item
            return key, reducer(key, values)

        with self.executor_class(max_workers=self.num_workers) as executor:
            reduced = dict(executor.map(reduce_item, grouped.items()))

        return reduced

# Example: Word count
def word_count_mapper(document: str) -> List[tuple]:
    """Map document to (word, 1) pairs"""
    words = document.lower().split()
    return [(word, 1) for word in words]

def word_count_reducer(word: str, counts: List[int]) -> int:
    """Sum all counts for a word"""
    return sum(counts)

def word_count_example():
    documents = [
        "hello world hello",
        "world is beautiful",
        "hello beautiful world",
        "python is beautiful"
    ]

    mr = MapReduce(num_workers=2, use_processes=False)
    result = mr.map_reduce(documents, word_count_mapper, word_count_reducer)
    print(result)
    # {'hello': 3, 'world': 3, 'is': 2, 'beautiful': 3, 'python': 1}

Scatter-Gather

class ScatterGather:
    """Scatter work to multiple workers, gather results"""

    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers

    def execute(
        self,
        data: List[T],
        worker_fn: Callable[[List[T]], U],
        gather_fn: Callable[[List[U]], Any]
    ) -> Any:
        """
        Scatter data to workers, gather and combine results.
        """
        # Scatter: Partition data
        chunk_size = (len(data) + self.num_workers - 1) // self.num_workers
        chunks = [
            data[i:i + chunk_size]
            for i in range(0, len(data), chunk_size)
        ]

        # Process in parallel
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            results = list(executor.map(worker_fn, chunks))

        # Gather: Combine results
        return gather_fn(results)

# Example: Parallel sum
def parallel_sum(numbers: List[int]) -> int:
    sg = ScatterGather(num_workers=4)

    def sum_chunk(chunk):
        return sum(chunk)

    def combine_sums(partial_sums):
        return sum(partial_sums)

    return sg.execute(numbers, sum_chunk, combine_sums)

Parallel Divide and Conquer

class ParallelDivideConquer:
    """Parallel divide and conquer framework"""

    def __init__(self, threshold: int = 1000, max_workers: int = 4):
        self.threshold = threshold
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    def solve(
        self,
        problem: T,
        is_base_case: Callable[[T], bool],
        solve_base: Callable[[T], U],
        divide: Callable[[T], List[T]],
        combine: Callable[[List[U]], U]
    ) -> U:
        """
        Solve problem using parallel divide and conquer.
        """
        if is_base_case(problem):
            return solve_base(problem)

        # Divide
        subproblems = divide(problem)

        # Conquer (parallel)
        futures = [
            self.executor.submit(
                self.solve, sub, is_base_case, solve_base, divide, combine
            )
            for sub in subproblems
        ]

        # Combine
        results = [f.result() for f in futures]
        return combine(results)

# Example: Parallel merge sort
def parallel_merge_sort(arr: List[int]) -> List[int]:
    pdc = ParallelDivideConquer(threshold=100, max_workers=4)

    def is_base(a):
        return len(a) <= 100

    def solve_base(a):
        return sorted(a)

    def divide(a):
        mid = len(a) // 2
        return [a[:mid], a[mid:]]

    def combine(sorted_halves):
        if len(sorted_halves) != 2:
            return sorted_halves[0] if sorted_halves else []

        left, right = sorted_halves
        result = []
        i = j = 0
        while i < len(left) and j < len(right):
            if left[i] <= right[j]:
                result.append(left[i])
                i += 1
            else:
                result.append(right[j])
                j += 1
        result.extend(left[i:])
        result.extend(right[j:])
        return result

    return pdc.solve(arr, is_base, solve_base, divide, combine)

6. Synchronization Patterns

Double-Checked Locking

class Singleton:
    """Thread-safe singleton using double-checked locking"""

    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:  # First check (no lock)
            with cls._lock:  # Acquire lock
                if cls._instance is None:  # Second check (with lock)
                    cls._instance = super().__new__(cls)
        return cls._instance

# Python-specific: Use __new__ or module-level
# Double-checked locking is tricky to get right!

Read-Copy-Update (RCU)

class RCUData:
    """Read-Copy-Update pattern for read-heavy workloads"""

    def __init__(self, initial_data: dict):
        self._data = initial_data.copy()
        self._lock = threading.Lock()

    def read(self) -> dict:
        """
        Read current data (lock-free).
        Returns a reference that remains valid.
        """
        return self._data  # Atomic read of reference

    def update(self, key: str, value: Any):
        """
        Update by copying, modifying, and replacing.
        """
        with self._lock:
            # Copy
            new_data = self._data.copy()
            # Update
            new_data[key] = value
            # Replace (atomic)
            self._data = new_data

    def bulk_update(self, updates: dict):
        """Bulk update"""
        with self._lock:
            new_data = self._data.copy()
            new_data.update(updates)
            self._data = new_data

Compare-and-Swap Pattern

import threading

class CASValue:
    """Compare-and-swap value (simulated)"""

    def __init__(self, initial: Any):
        self._value = initial
        self._lock = threading.Lock()

    def get(self) -> Any:
        return self._value

    def compare_and_swap(self, expected: Any, new_value: Any) -> bool:
        """
        Atomically: if value == expected, set to new_value.
        Returns whether the swap occurred.
        """
        with self._lock:
            if self._value == expected:
                self._value = new_value
                return True
            return False

class LockFreeCounter:
    """Counter using CAS"""

    def __init__(self):
        self._value = CASValue(0)

    def increment(self) -> int:
        """Increment and return new value"""
        while True:
            current = self._value.get()
            if self._value.compare_and_swap(current, current + 1):
                return current + 1
            # CAS failed, retry

    def get(self) -> int:
        return self._value.get()

Exercises

Basic

  1. Implement a producer-consumer with multiple producers and consumers.

  2. Create a thread pool that can be resized at runtime.

  3. Implement a simple future with timeout support.

Intermediate

  1. Build an actor system with supervision (restart failed actors).

  2. Implement a parallel map-reduce for computing average.

  3. Create a work-stealing scheduler and compare to static partitioning.

Advanced

  1. Implement a concurrent cache with read-copy-update semantics.

  2. Build a reactive streams processor with backpressure.

  3. Design a distributed actor system across multiple processes.


Summary

  • Producer-consumer decouples data generation from processing
  • Thread pools amortize thread creation cost
  • Work stealing balances load dynamically
  • Futures represent pending computations
  • Actors encapsulate state and communicate via messages
  • Map-reduce and scatter-gather parallelize data processing

Next Reading

GPU and Parallel Hardware →