Concurrent Patterns
Introduction
Concurrent programming patterns help structure parallel code for correctness, efficiency, and maintainability. This reading covers established patterns for common parallel computing problems.
Learning Objectives
By the end of this reading, you will be able to:
- Implement producer-consumer patterns
- Use thread pools and executors
- Apply the actor model for concurrency
- Implement futures and promises
- Design work-stealing schedulers
1. Producer-Consumer Pattern
Basic Implementation
import threading
import queue
import time
import random
class ProducerConsumer:
"""Classic producer-consumer with bounded buffer"""
def __init__(self, buffer_size=10):
self.buffer = queue.Queue(maxsize=buffer_size)
self.shutdown = threading.Event()
def producer(self, producer_id, num_items):
"""Produce items and put in buffer"""
for i in range(num_items):
if self.shutdown.is_set():
break
item = f"item-{producer_id}-{i}"
self.buffer.put(item) # Blocks if full
print(f"Producer {producer_id}: produced {item}")
time.sleep(random.uniform(0.01, 0.1))
print(f"Producer {producer_id}: done")
def consumer(self, consumer_id):
"""Consume items from buffer"""
while not self.shutdown.is_set():
try:
item = self.buffer.get(timeout=1)
print(f"Consumer {consumer_id}: consumed {item}")
self.buffer.task_done()
time.sleep(random.uniform(0.05, 0.15))
except queue.Empty:
continue
print(f"Consumer {consumer_id}: shutting down")
def run(self, num_producers=2, num_consumers=3, items_per_producer=10):
"""Run the producer-consumer system"""
threads = []
# Start consumers
for i in range(num_consumers):
t = threading.Thread(target=self.consumer, args=(i,))
t.start()
threads.append(t)
# Start producers
producer_threads = []
for i in range(num_producers):
t = threading.Thread(
target=self.producer,
args=(i, items_per_producer)
)
t.start()
producer_threads.append(t)
# Wait for producers
for t in producer_threads:
t.join()
# Wait for buffer to empty
self.buffer.join()
# Signal shutdown
self.shutdown.set()
for t in threads:
t.join()
Multiple Queues (Pipeline)
class Pipeline:
"""Multi-stage processing pipeline"""
def __init__(self):
self.stage1_queue = queue.Queue()
self.stage2_queue = queue.Queue()
self.stage3_queue = queue.Queue()
self.results = []
self.shutdown = threading.Event()
def stage1_worker(self):
"""First stage: Parse input"""
while not self.shutdown.is_set():
try:
raw_data = self.stage1_queue.get(timeout=1)
parsed = f"parsed({raw_data})"
self.stage2_queue.put(parsed)
self.stage1_queue.task_done()
except queue.Empty:
continue
def stage2_worker(self):
"""Second stage: Transform data"""
while not self.shutdown.is_set():
try:
parsed = self.stage2_queue.get(timeout=1)
transformed = f"transformed({parsed})"
self.stage3_queue.put(transformed)
self.stage2_queue.task_done()
except queue.Empty:
continue
def stage3_worker(self):
"""Third stage: Store results"""
while not self.shutdown.is_set():
try:
transformed = self.stage3_queue.get(timeout=1)
result = f"stored({transformed})"
self.results.append(result)
self.stage3_queue.task_done()
except queue.Empty:
continue
def process(self, items):
"""Process items through pipeline"""
# Start workers
workers = [
threading.Thread(target=self.stage1_worker),
threading.Thread(target=self.stage2_worker),
threading.Thread(target=self.stage3_worker)
]
for w in workers:
w.start()
# Feed input
for item in items:
self.stage1_queue.put(item)
# Wait for completion
self.stage1_queue.join()
self.stage2_queue.join()
self.stage3_queue.join()
# Shutdown
self.shutdown.set()
for w in workers:
w.join()
return self.results
2. Thread Pool Pattern
Basic Thread Pool
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import Callable, List, Any
class SimpleThreadPool:
"""Simple thread pool implementation"""
def __init__(self, num_workers: int):
self.num_workers = num_workers
self.task_queue = queue.Queue()
self.workers = []
self.shutdown_flag = threading.Event()
self._start_workers()
def _start_workers(self):
"""Start worker threads"""
for i in range(self.num_workers):
t = threading.Thread(target=self._worker, name=f"Worker-{i}")
t.daemon = True
t.start()
self.workers.append(t)
def _worker(self):
"""Worker thread main loop"""
while not self.shutdown_flag.is_set():
try:
task, args, kwargs, result_holder = self.task_queue.get(timeout=1)
try:
result = task(*args, **kwargs)
result_holder['result'] = result
except Exception as e:
result_holder['error'] = e
finally:
result_holder['done'].set()
self.task_queue.task_done()
except queue.Empty:
continue
def submit(self, task: Callable, *args, **kwargs) -> dict:
"""Submit task and return future-like object"""
result_holder = {
'result': None,
'error': None,
'done': threading.Event()
}
self.task_queue.put((task, args, kwargs, result_holder))
return result_holder
def map(self, func: Callable, items: List) -> List:
"""Map function over items"""
futures = [self.submit(func, item) for item in items]
results = []
for f in futures:
f['done'].wait()
if f['error']:
raise f['error']
results.append(f['result'])
return results
def shutdown(self, wait=True):
"""Shutdown the pool"""
self.shutdown_flag.set()
if wait:
for w in self.workers:
w.join()
# Using Python's built-in ThreadPoolExecutor
def executor_example():
"""Standard library thread pool"""
def process_item(item):
time.sleep(0.1)
return item * 2
items = list(range(20))
# Method 1: map (preserves order)
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, items))
print(f"Map results: {results}")
# Method 2: submit (manual future handling)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_item, item) for item in items]
# as_completed - process results as they finish
for future in as_completed(futures):
result = future.result()
print(f"Completed: {result}")
Work Stealing
import random
from collections import deque
from typing import Optional, Callable
class WorkStealingPool:
"""Thread pool with work stealing for load balancing"""
def __init__(self, num_workers: int):
self.num_workers = num_workers
# Each worker has its own deque
self.queues = [deque() for _ in range(num_workers)]
self.workers = []
self.shutdown_flag = threading.Event()
self.lock = threading.Lock()
self._start_workers()
def _start_workers(self):
for i in range(self.num_workers):
t = threading.Thread(
target=self._worker,
args=(i,),
name=f"Worker-{i}"
)
t.daemon = True
t.start()
self.workers.append(t)
def _worker(self, worker_id: int):
"""Worker with work stealing"""
my_queue = self.queues[worker_id]
while not self.shutdown_flag.is_set():
task = self._get_task(worker_id)
if task:
try:
task()
except Exception as e:
print(f"Worker {worker_id} error: {e}")
else:
time.sleep(0.001) # Brief sleep if no work
def _get_task(self, worker_id: int) -> Optional[Callable]:
"""Get task from own queue or steal from others"""
my_queue = self.queues[worker_id]
# Try own queue first (LIFO for locality)
try:
return my_queue.pop()
except IndexError:
pass
# Try to steal from another worker (FIFO for fairness)
for i in range(self.num_workers):
if i == worker_id:
continue
try:
return self.queues[i].popleft() # Steal from front
except IndexError:
continue
return None
def submit(self, task: Callable):
"""Submit task to random queue"""
worker_id = random.randint(0, self.num_workers - 1)
self.queues[worker_id].append(task)
def submit_to(self, worker_id: int, task: Callable):
"""Submit task to specific worker (for locality)"""
self.queues[worker_id].append(task)
3. Futures and Promises
Future Implementation
from typing import TypeVar, Generic, Optional, Callable
import threading
T = TypeVar('T')
class Future(Generic[T]):
"""Represents a value that will be available later"""
def __init__(self):
self._result: Optional[T] = None
self._exception: Optional[Exception] = None
self._done = threading.Event()
self._callbacks: List[Callable] = []
self._lock = threading.Lock()
def set_result(self, result: T):
"""Set the result (called by executor)"""
with self._lock:
self._result = result
self._done.set()
callbacks = self._callbacks.copy()
for callback in callbacks:
try:
callback(self)
except Exception:
pass
def set_exception(self, exc: Exception):
"""Set an exception (called by executor)"""
with self._lock:
self._exception = exc
self._done.set()
callbacks = self._callbacks.copy()
for callback in callbacks:
try:
callback(self)
except Exception:
pass
def result(self, timeout: Optional[float] = None) -> T:
"""Get the result, blocking if necessary"""
if not self._done.wait(timeout):
raise TimeoutError("Future timed out")
if self._exception:
raise self._exception
return self._result
def done(self) -> bool:
"""Check if future is done"""
return self._done.is_set()
def add_done_callback(self, callback: Callable):
"""Add callback to run when done"""
with self._lock:
if self._done.is_set():
callback(self)
else:
self._callbacks.append(callback)
class Promise(Generic[T]):
"""Producer side of Future"""
def __init__(self):
self.future = Future[T]()
def set_result(self, result: T):
self.future.set_result(result)
def set_exception(self, exc: Exception):
self.future.set_exception(exc)
def get_future(self) -> Future[T]:
return self.future
# Usage example
def async_computation():
"""Demonstrate future/promise"""
promise = Promise[int]()
future = promise.get_future()
def compute():
time.sleep(1)
promise.set_result(42)
thread = threading.Thread(target=compute)
thread.start()
# Add callback
future.add_done_callback(lambda f: print(f"Callback: {f.result()}"))
# Can do other work here...
print("Waiting for result...")
# Block for result
result = future.result()
print(f"Got result: {result}")
thread.join()
Chaining Futures
class ChainableFuture(Future[T]):
"""Future with then/catch chaining"""
def then(self, fn: Callable[[T], 'U']) -> 'ChainableFuture[U]':
"""Chain a transformation"""
result_future = ChainableFuture()
def callback(completed_future):
try:
value = completed_future.result()
new_value = fn(value)
result_future.set_result(new_value)
except Exception as e:
result_future.set_exception(e)
self.add_done_callback(callback)
return result_future
def catch(self, fn: Callable[[Exception], T]) -> 'ChainableFuture[T]':
"""Handle exceptions"""
result_future = ChainableFuture()
def callback(completed_future):
try:
value = completed_future.result()
result_future.set_result(value)
except Exception as e:
try:
recovered = fn(e)
result_future.set_result(recovered)
except Exception as e2:
result_future.set_exception(e2)
self.add_done_callback(callback)
return result_future
def future_chaining_example():
"""Chain operations on futures"""
def fetch_user(user_id):
promise = Promise()
def work():
time.sleep(0.5)
promise.set_result({"id": user_id, "name": "Alice"})
threading.Thread(target=work).start()
future = ChainableFuture()
promise.future.add_done_callback(
lambda f: future.set_result(f.result()) if not f._exception
else future.set_exception(f._exception)
)
return future
# Chain transformations
result = (
fetch_user(123)
.then(lambda user: user['name'])
.then(lambda name: name.upper())
.catch(lambda e: "UNKNOWN")
)
print(result.result()) # "ALICE"
4. Actor Model
Basic Actor Implementation
import queue
import threading
from typing import Any, Dict
from abc import ABC, abstractmethod
class Message:
"""Message sent between actors"""
def __init__(self, content: Any, sender: 'Actor' = None):
self.content = content
self.sender = sender
class Actor(ABC):
"""Base actor class"""
def __init__(self, name: str):
self.name = name
self.mailbox = queue.Queue()
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
"""Actor main loop"""
while self._running:
try:
message = self.mailbox.get(timeout=1)
self.receive(message)
except queue.Empty:
continue
@abstractmethod
def receive(self, message: Message):
"""Handle incoming message - implement in subclass"""
pass
def send(self, target: 'Actor', content: Any):
"""Send message to another actor"""
message = Message(content, sender=self)
target.mailbox.put(message)
def stop(self):
"""Stop the actor"""
self._running = False
self._thread.join()
# Example actors
class PrinterActor(Actor):
"""Actor that prints messages"""
def receive(self, message: Message):
print(f"[{self.name}] Received: {message.content}")
if message.sender:
# Reply to sender
self.send(message.sender, f"ACK: {message.content}")
class CounterActor(Actor):
"""Actor that counts"""
def __init__(self, name: str):
super().__init__(name)
self.count = 0
def receive(self, message: Message):
if message.content == "increment":
self.count += 1
elif message.content == "decrement":
self.count -= 1
elif message.content == "get":
if message.sender:
self.send(message.sender, self.count)
class CollectorActor(Actor):
"""Actor that collects results"""
def __init__(self, name: str, expected: int):
super().__init__(name)
self.results = []
self.expected = expected
self.done = threading.Event()
def receive(self, message: Message):
self.results.append(message.content)
if len(self.results) >= self.expected:
self.done.set()
def actor_example():
"""Demonstrate actor model"""
printer = PrinterActor("Printer")
counter = CounterActor("Counter")
collector = CollectorActor("Collector", 1)
# Send messages
collector.send(counter, "increment")
collector.send(counter, "increment")
collector.send(counter, "increment")
counter.send(printer, "Working...")
# Get count
collector.send(counter, "get")
collector.done.wait(timeout=2)
print(f"Final count: {collector.results}")
# Cleanup
printer.stop()
counter.stop()
collector.stop()
Actor System
class ActorSystem:
"""Manages actors"""
def __init__(self):
self.actors: Dict[str, Actor] = {}
self.lock = threading.Lock()
def create_actor(self, actor_class, name: str, *args, **kwargs) -> Actor:
"""Create and register an actor"""
with self.lock:
if name in self.actors:
raise ValueError(f"Actor {name} already exists")
actor = actor_class(name, *args, **kwargs)
self.actors[name] = actor
return actor
def get_actor(self, name: str) -> Actor:
"""Get actor by name"""
return self.actors.get(name)
def send(self, target_name: str, content: Any, sender: Actor = None):
"""Send message to named actor"""
target = self.actors.get(target_name)
if target:
target.mailbox.put(Message(content, sender))
def shutdown(self):
"""Stop all actors"""
for actor in self.actors.values():
actor.stop()
self.actors.clear()
5. Parallel Patterns
Map-Reduce
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import reduce
from typing import List, Callable, TypeVar, Iterable
import itertools
T = TypeVar('T')
U = TypeVar('U')
class MapReduce:
"""Parallel map-reduce implementation"""
def __init__(self, num_workers: int = 4, use_processes: bool = True):
self.num_workers = num_workers
self.executor_class = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
def map_reduce(
self,
data: List[T],
mapper: Callable[[T], List[tuple]],
reducer: Callable[[str, List], U]
) -> Dict[str, U]:
"""
Execute map-reduce.
mapper: T -> [(key, value), ...]
reducer: (key, [values]) -> result
"""
# Map phase (parallel)
with self.executor_class(max_workers=self.num_workers) as executor:
mapped = list(executor.map(mapper, data))
# Flatten and shuffle
all_pairs = list(itertools.chain.from_iterable(mapped))
# Group by key
grouped = {}
for key, value in all_pairs:
if key not in grouped:
grouped[key] = []
grouped[key].append(value)
# Reduce phase (parallel)
def reduce_item(item):
key, values = item
return key, reducer(key, values)
with self.executor_class(max_workers=self.num_workers) as executor:
reduced = dict(executor.map(reduce_item, grouped.items()))
return reduced
# Example: Word count
def word_count_mapper(document: str) -> List[tuple]:
"""Map document to (word, 1) pairs"""
words = document.lower().split()
return [(word, 1) for word in words]
def word_count_reducer(word: str, counts: List[int]) -> int:
"""Sum all counts for a word"""
return sum(counts)
def word_count_example():
documents = [
"hello world hello",
"world is beautiful",
"hello beautiful world",
"python is beautiful"
]
mr = MapReduce(num_workers=2, use_processes=False)
result = mr.map_reduce(documents, word_count_mapper, word_count_reducer)
print(result)
# {'hello': 3, 'world': 3, 'is': 2, 'beautiful': 3, 'python': 1}
Scatter-Gather
class ScatterGather:
"""Scatter work to multiple workers, gather results"""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
def execute(
self,
data: List[T],
worker_fn: Callable[[List[T]], U],
gather_fn: Callable[[List[U]], Any]
) -> Any:
"""
Scatter data to workers, gather and combine results.
"""
# Scatter: Partition data
chunk_size = (len(data) + self.num_workers - 1) // self.num_workers
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
# Process in parallel
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
results = list(executor.map(worker_fn, chunks))
# Gather: Combine results
return gather_fn(results)
# Example: Parallel sum
def parallel_sum(numbers: List[int]) -> int:
sg = ScatterGather(num_workers=4)
def sum_chunk(chunk):
return sum(chunk)
def combine_sums(partial_sums):
return sum(partial_sums)
return sg.execute(numbers, sum_chunk, combine_sums)
Parallel Divide and Conquer
class ParallelDivideConquer:
"""Parallel divide and conquer framework"""
def __init__(self, threshold: int = 1000, max_workers: int = 4):
self.threshold = threshold
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def solve(
self,
problem: T,
is_base_case: Callable[[T], bool],
solve_base: Callable[[T], U],
divide: Callable[[T], List[T]],
combine: Callable[[List[U]], U]
) -> U:
"""
Solve problem using parallel divide and conquer.
"""
if is_base_case(problem):
return solve_base(problem)
# Divide
subproblems = divide(problem)
# Conquer (parallel)
futures = [
self.executor.submit(
self.solve, sub, is_base_case, solve_base, divide, combine
)
for sub in subproblems
]
# Combine
results = [f.result() for f in futures]
return combine(results)
# Example: Parallel merge sort
def parallel_merge_sort(arr: List[int]) -> List[int]:
pdc = ParallelDivideConquer(threshold=100, max_workers=4)
def is_base(a):
return len(a) <= 100
def solve_base(a):
return sorted(a)
def divide(a):
mid = len(a) // 2
return [a[:mid], a[mid:]]
def combine(sorted_halves):
if len(sorted_halves) != 2:
return sorted_halves[0] if sorted_halves else []
left, right = sorted_halves
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
return pdc.solve(arr, is_base, solve_base, divide, combine)
6. Synchronization Patterns
Double-Checked Locking
class Singleton:
"""Thread-safe singleton using double-checked locking"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None: # First check (no lock)
with cls._lock: # Acquire lock
if cls._instance is None: # Second check (with lock)
cls._instance = super().__new__(cls)
return cls._instance
# Python-specific: Use __new__ or module-level
# Double-checked locking is tricky to get right!
Read-Copy-Update (RCU)
class RCUData:
"""Read-Copy-Update pattern for read-heavy workloads"""
def __init__(self, initial_data: dict):
self._data = initial_data.copy()
self._lock = threading.Lock()
def read(self) -> dict:
"""
Read current data (lock-free).
Returns a reference that remains valid.
"""
return self._data # Atomic read of reference
def update(self, key: str, value: Any):
"""
Update by copying, modifying, and replacing.
"""
with self._lock:
# Copy
new_data = self._data.copy()
# Update
new_data[key] = value
# Replace (atomic)
self._data = new_data
def bulk_update(self, updates: dict):
"""Bulk update"""
with self._lock:
new_data = self._data.copy()
new_data.update(updates)
self._data = new_data
Compare-and-Swap Pattern
import threading
class CASValue:
"""Compare-and-swap value (simulated)"""
def __init__(self, initial: Any):
self._value = initial
self._lock = threading.Lock()
def get(self) -> Any:
return self._value
def compare_and_swap(self, expected: Any, new_value: Any) -> bool:
"""
Atomically: if value == expected, set to new_value.
Returns whether the swap occurred.
"""
with self._lock:
if self._value == expected:
self._value = new_value
return True
return False
class LockFreeCounter:
"""Counter using CAS"""
def __init__(self):
self._value = CASValue(0)
def increment(self) -> int:
"""Increment and return new value"""
while True:
current = self._value.get()
if self._value.compare_and_swap(current, current + 1):
return current + 1
# CAS failed, retry
def get(self) -> int:
return self._value.get()
Exercises
Basic
Implement a producer-consumer with multiple producers and consumers.
Create a thread pool that can be resized at runtime.
Implement a simple future with timeout support.
Intermediate
Build an actor system with supervision (restart failed actors).
Implement a parallel map-reduce for computing average.
Create a work-stealing scheduler and compare to static partitioning.
Advanced
Implement a concurrent cache with read-copy-update semantics.
Build a reactive streams processor with backpressure.
Design a distributed actor system across multiple processes.
Summary
- Producer-consumer decouples data generation from processing
- Thread pools amortize thread creation cost
- Work stealing balances load dynamically
- Futures represent pending computations
- Actors encapsulate state and communicate via messages
- Map-reduce and scatter-gather parallelize data processing