Concurrent Patterns
Concurrent programming patterns help structure parallel code for correctness, efficiency, and maintainability. ## 1. Producer-Consumer Pattern
Basic Implementation
import threading
import queue
import time
import random
class ProducerConsumer:
"""Classic producer-consumer with bounded buffer"""
def __init__(self, buffer_size=10):
self.buffer = queue.Queue(maxsize=buffer_size)
self.shutdown = threading.Event()
def producer(self, producer_id, num_items):
"""Produce items and put in buffer"""
for i in range(num_items):
if self.shutdown.is_set():
break
item = f"item-{producer_id}-{i}"
self.buffer.put(item) # Blocks if full
print(f"Producer {producer_id}: produced {item}")
time.sleep(random.uniform(0.01, 0.1))
print(f"Producer {producer_id}: done")
def consumer(self, consumer_id):
"""Consume items from buffer"""
while not self.shutdown.is_set():
try:
item = self.buffer.get(timeout=1)
print(f"Consumer {consumer_id}: consumed {item}")
self.buffer.task_done()
time.sleep(random.uniform(0.05, 0.15))
except queue.Empty:
continue
print(f"Consumer {consumer_id}: shutting down")
def run(self, num_producers=2, num_consumers=3, items_per_producer=10):
"""Run the producer-consumer system"""
threads = []
# Start consumers
for i in range(num_consumers):
t = threading.Thread(target=self.consumer, args=(i,))
t.start()
threads.append(t)
# Start producers
producer_threads = []
for i in range(num_producers):
t = threading.Thread(
target=self.producer,
args=(i, items_per_producer)
)
t.start()
producer_threads.append(t)
# Wait for producers
for t in producer_threads:
t.join()
# Wait for buffer to empty
self.buffer.join()
# Signal shutdown
self.shutdown.set()
for t in threads:
t.join()
Multiple Queues (Pipeline)
class Pipeline:
"""Multi-stage processing pipeline"""
def __init__(self):
self.stage1_queue = queue.Queue()
self.stage2_queue = queue.Queue()
self.stage3_queue = queue.Queue()
self.results = []
self.shutdown = threading.Event()
def stage1_worker(self):
"""First stage: Parse input"""
while not self.shutdown.is_set():
try:
raw_data = self.stage1_queue.get(timeout=1)
parsed = f"parsed({raw_data})"
self.stage2_queue.put(parsed)
self.stage1_queue.task_done()
except queue.Empty:
continue
def stage2_worker(self):
"""Second stage: Transform data"""
while not self.shutdown.is_set():
try:
parsed = self.stage2_queue.get(timeout=1)
transformed = f"transformed({parsed})"
self.stage3_queue.put(transformed)
self.stage2_queue.task_done()
except queue.Empty:
continue
def stage3_worker(self):
"""Third stage: Store results"""
while not self.shutdown.is_set():
try:
transformed = self.stage3_queue.get(timeout=1)
result = f"stored({transformed})"
self.results.append(result)
self.stage3_queue.task_done()
except queue.Empty:
continue
def process(self, items):
"""Process items through pipeline"""
# Start workers
workers = [
threading.Thread(target=self.stage1_worker),
threading.Thread(target=self.stage2_worker),
threading.Thread(target=self.stage3_worker)
]
for w in workers:
w.start()
# Feed input
for item in items:
self.stage1_queue.put(item)
# Wait for completion
self.stage1_queue.join()
self.stage2_queue.join()
self.stage3_queue.join()
# Shutdown
self.shutdown.set()
for w in workers:
w.join()
return self.results
2. Thread Pool Pattern
Basic Thread Pool
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import Callable, List, Any
class SimpleThreadPool:
"""Simple thread pool implementation"""
def __init__(self, num_workers: int):
self.num_workers = num_workers
self.task_queue = queue.Queue()
self.workers = []
self.shutdown_flag = threading.Event()
self._start_workers()
def _start_workers(self):
"""Start worker threads"""
for i in range(self.num_workers):
t = threading.Thread(target=self._worker, name=f"Worker-{i}")
t.daemon = True
t.start()
self.workers.append(t)
def _worker(self):
"""Worker thread main loop"""
while not self.shutdown_flag.is_set():
try:
task, args, kwargs, result_holder = self.task_queue.get(timeout=1)
try:
result = task(*args, **kwargs)
result_holder['result'] = result
except Exception as e:
result_holder['error'] = e
finally:
result_holder['done'].set()
self.task_queue.task_done()
except queue.Empty:
continue
def submit(self, task: Callable, *args, **kwargs) -> dict:
"""Submit task and return future-like object"""
result_holder = {
'result': None,
'error': None,
'done': threading.Event()
}
self.task_queue.put((task, args, kwargs, result_holder))
return result_holder
def map(self, func: Callable, items: List) -> List:
"""Map function over items"""
futures = [self.submit(func, item) for item in items]
results = []
for f in futures:
f['done'].wait()
if f['error']:
raise f['error']
results.append(f['result'])
return results
def shutdown(self, wait=True):
"""Shutdown the pool"""
self.shutdown_flag.set()
if wait:
for w in self.workers:
w.join()
# Using Python's built-in ThreadPoolExecutor
def executor_example():
"""Standard library thread pool"""
def process_item(item):
time.sleep(0.1)
return item * 2
items = list(range(20))
# Method 1: map (preserves order)
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, items))
print(f"Map results: {results}")
# Method 2: submit (manual future handling)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_item, item) for item in items]
# as_completed - process results as they finish
for future in as_completed(futures):
result = future.result()
print(f"Completed: {result}")
Work Stealing
import random
from collections import deque
from typing import Optional, Callable
class WorkStealingPool:
"""Thread pool with work stealing for load balancing"""
def __init__(self, num_workers: int):
self.num_workers = num_workers
# Each worker has its own deque
self.queues = [deque() for _ in range(num_workers)]
self.workers = []
self.shutdown_flag = threading.Event()
self.lock = threading.Lock()
self._start_workers()
def _start_workers(self):
for i in range(self.num_workers):
t = threading.Thread(
target=self._worker,
args=(i,),
name=f"Worker-{i}"
)
t.daemon = True
t.start()
self.workers.append(t)
def _worker(self, worker_id: int):
"""Worker with work stealing"""
my_queue = self.queues[worker_id]
while not self.shutdown_flag.is_set():
task = self._get_task(worker_id)
if task:
try:
task()
except Exception as e:
print(f"Worker {worker_id} error: {e}")
else:
time.sleep(0.001) # Brief sleep if no work
def _get_task(self, worker_id: int) -> Optional[Callable]:
"""Get task from own queue or steal from others"""
my_queue = self.queues[worker_id]
# Try own queue first (LIFO for locality)
try:
return my_queue.pop()
except IndexError:
pass
# Try to steal from another worker (FIFO for fairness)
for i in range(self.num_workers):
if i == worker_id:
continue
try:
return self.queues[i].popleft() # Steal from front
except IndexError:
continue
return None
def submit(self, task: Callable):
"""Submit task to random queue"""
worker_id = random.randint(0, self.num_workers - 1)
self.queues[worker_id].append(task)
def submit_to(self, worker_id: int, task: Callable):
"""Submit task to specific worker (for locality)"""
self.queues[worker_id].append(task)
3. Futures and Promises
Future Implementation
from typing import TypeVar, Generic, Optional, Callable
import threading
T = TypeVar('T')
class Future(Generic[T]):
"""Represents a value that will be available later"""
def __init__(self):
self._result: Optional[T] = None
self._exception: Optional[Exception] = None
self._done = threading.Event()
self._callbacks: List[Callable] = []
self._lock = threading.Lock()
def set_result(self, result: T):
"""Set the result (called by executor)"""
with self._lock:
self._result = result
self._done.set()
callbacks = self._callbacks.copy()
for callback in callbacks:
try:
callback(self)
except Exception:
pass
def set_exception(self, exc: Exception):
"""Set an exception (called by executor)"""
with self._lock:
self._exception = exc
self._done.set()
callbacks = self._callbacks.copy()
for callback in callbacks:
try:
callback(self)
except Exception:
pass
def result(self, timeout: Optional[float] = None) -> T:
"""Get the result, blocking if necessary"""
if not self._done.wait(timeout):
raise TimeoutError("Future timed out")
if self._exception:
raise self._exception
return self._result
def done(self) -> bool:
"""Check if future is done"""
return self._done.is_set()
def add_done_callback(self, callback: Callable):
"""Add callback to run when done"""
with self._lock:
if self._done.is_set():
callback(self)
else:
self._callbacks.append(callback)
class Promise(Generic[T]):
"""Producer side of Future"""
def __init__(self):
self.future = Future[T]()
def set_result(self, result: T):
self.future.set_result(result)
def set_exception(self, exc: Exception):
self.future.set_exception(exc)
def get_future(self) -> Future[T]:
return self.future
# Usage example
def async_computation():
"""Demonstrate future/promise"""
promise = Promise[int]()
future = promise.get_future()
def compute():
time.sleep(1)
promise.set_result(42)
thread = threading.Thread(target=compute)
thread.start()
# Add callback
future.add_done_callback(lambda f: print(f"Callback: {f.result()}"))
# Can do other work here...
print("Waiting for result...")
# Block for result
result = future.result()
print(f"Got result: {result}")
thread.join()
Chaining Futures
class ChainableFuture(Future[T]):
"""Future with then/catch chaining"""
def then(self, fn: Callable[[T], 'U']) -> 'ChainableFuture[U]':
"""Chain a transformation"""
result_future = ChainableFuture()
def callback(completed_future):
try:
value = completed_future.result()
new_value = fn(value)
result_future.set_result(new_value)
except Exception as e:
result_future.set_exception(e)
self.add_done_callback(callback)
return result_future
def catch(self, fn: Callable[[Exception], T]) -> 'ChainableFuture[T]':
"""Handle exceptions"""
result_future = ChainableFuture()
def callback(completed_future):
try:
value = completed_future.result()
result_future.set_result(value)
except Exception as e:
try:
recovered = fn(e)
result_future.set_result(recovered)
except Exception as e2:
result_future.set_exception(e2)
self.add_done_callback(callback)
return result_future
def future_chaining_example():
"""Chain operations on futures"""
def fetch_user(user_id):
promise = Promise()
def work():
time.sleep(0.5)
promise.set_result({"id": user_id, "name": "Alice"})
threading.Thread(target=work).start()
future = ChainableFuture()
promise.future.add_done_callback(
lambda f: future.set_result(f.result()) if not f._exception
else future.set_exception(f._exception)
)
return future
# Chain transformations
result = (
fetch_user(123)
.then(lambda user: user['name'])
.then(lambda name: name.upper())
.catch(lambda e: "UNKNOWN")
)
print(result.result()) # "ALICE"
4. Actor Model
Basic Actor Implementation
import queue
import threading
from typing import Any, Dict
from abc import ABC, abstractmethod
class Message:
"""Message sent between actors"""
def __init__(self, content: Any, sender: 'Actor' = None):
self.content = content
self.sender = sender
class Actor(ABC):
"""Base actor class"""
def __init__(self, name: str):
self.name = name
self.mailbox = queue.Queue()
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
"""Actor main loop"""
while self._running:
try:
message = self.mailbox.get(timeout=1)
self.receive(message)
except queue.Empty:
continue
@abstractmethod
def receive(self, message: Message):
"""Handle incoming message - implement in subclass"""
pass
def send(self, target: 'Actor', content: Any):
"""Send message to another actor"""
message = Message(content, sender=self)
target.mailbox.put(message)
def stop(self):
"""Stop the actor"""
self._running = False
self._thread.join()
# Example actors
class PrinterActor(Actor):
"""Actor that prints messages"""
def receive(self, message: Message):
print(f"[{self.name}] Received: {message.content}")
if message.sender:
# Reply to sender
self.send(message.sender, f"ACK: {message.content}")
class CounterActor(Actor):
"""Actor that counts"""
def __init__(self, name: str):
super().__init__(name)
self.count = 0
def receive(self, message: Message):
if message.content == "increment":
self.count += 1
elif message.content == "decrement":
self.count -= 1
elif message.content == "get":
if message.sender:
self.send(message.sender, self.count)
class CollectorActor(Actor):
"""Actor that collects results"""
def __init__(self, name: str, expected: int):
super().__init__(name)
self.results = []
self.expected = expected
self.done = threading.Event()
def receive(self, message: Message):
self.results.append(message.content)
if len(self.results) >= self.expected:
self.done.set()
def actor_example():
"""Demonstrate actor model"""
printer = PrinterActor("Printer")
counter = CounterActor("Counter")
collector = CollectorActor("Collector", 1)
# Send messages
collector.send(counter, "increment")
collector.send(counter, "increment")
collector.send(counter, "increment")
counter.send(printer, "Working...")
# Get count
collector.send(counter, "get")
collector.done.wait(timeout=2)
print(f"Final count: {collector.results}")
# Cleanup
printer.stop()
counter.stop()
collector.stop()
Actor System
class ActorSystem:
"""Manages actors"""
def __init__(self):
self.actors: Dict[str, Actor] = {}
self.lock = threading.Lock()
def create_actor(self, actor_class, name: str, *args, **kwargs) -> Actor:
"""Create and register an actor"""
with self.lock:
if name in self.actors:
raise ValueError(f"Actor {name} already exists")
actor = actor_class(name, *args, **kwargs)
self.actors[name] = actor
return actor
def get_actor(self, name: str) -> Actor:
"""Get actor by name"""
return self.actors.get(name)
def send(self, target_name: str, content: Any, sender: Actor = None):
"""Send message to named actor"""
target = self.actors.get(target_name)
if target:
target.mailbox.put(Message(content, sender))
def shutdown(self):
"""Stop all actors"""
for actor in self.actors.values():
actor.stop()
self.actors.clear()
5. Parallel Patterns
Map-Reduce
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import reduce
from typing import List, Callable, TypeVar, Iterable
import itertools
T = TypeVar('T')
U = TypeVar('U')
class MapReduce:
"""Parallel map-reduce implementation"""
def __init__(self, num_workers: int = 4, use_processes: bool = True):
self.num_workers = num_workers
self.executor_class = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
def map_reduce(
self,
data: List[T],
mapper: Callable[[T], List[tuple]],
reducer: Callable[[str, List], U]
) -> Dict[str, U]:
"""
Execute map-reduce.
mapper: T -> [(key, value), ...]
reducer: (key, [values]) -> result
"""
# Map phase (parallel)
with self.executor_class(max_workers=self.num_workers) as executor:
mapped = list(executor.map(mapper, data))
# Flatten and shuffle
all_pairs = list(itertools.chain.from_iterable(mapped))
# Group by key
grouped = {}
for key, value in all_pairs:
if key not in grouped:
grouped[key] = []
grouped[key].append(value)
# Reduce phase (parallel)
def reduce_item(item):
key, values = item
return key, reducer(key, values)
with self.executor_class(max_workers=self.num_workers) as executor:
reduced = dict(executor.map(reduce_item, grouped.items()))
return reduced
# Example: Word count
def word_count_mapper(document: str) -> List[tuple]:
"""Map document to (word, 1) pairs"""
words = document.lower().split()
return [(word, 1) for word in words]
def word_count_reducer(word: str, counts: List[int]) -> int:
"""Sum all counts for a word"""
return sum(counts)
def word_count_example():
documents = [
"hello world hello",
"world is beautiful",
"hello beautiful world",
"python is beautiful"
]
mr = MapReduce(num_workers=2, use_processes=False)
result = mr.map_reduce(documents, word_count_mapper, word_count_reducer)
print(result)
# {'hello': 3, 'world': 3, 'is': 2, 'beautiful': 3, 'python': 1}
Scatter-Gather
class ScatterGather:
"""Scatter work to multiple workers, gather results"""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
def execute(
self,
data: List[T],
worker_fn: Callable[[List[T]], U],
gather_fn: Callable[[List[U]], Any]
) -> Any:
"""
Scatter data to workers, gather and combine results.
"""
# Scatter: Partition data
chunk_size = (len(data) + self.num_workers - 1) // self.num_workers
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
# Process in parallel
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
results = list(executor.map(worker_fn, chunks))
# Gather: Combine results
return gather_fn(results)
# Example: Parallel sum
def parallel_sum(numbers: List[int]) -> int:
sg = ScatterGather(num_workers=4)
def sum_chunk(chunk):
return sum(chunk)
def combine_sums(partial_sums):
return sum(partial_sums)
return sg.execute(numbers, sum_chunk, combine_sums)
Parallel Divide and Conquer
class ParallelDivideConquer:
"""Parallel divide and conquer framework"""
def __init__(self, threshold: int = 1000, max_workers: int = 4):
self.threshold = threshold
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def solve(
self,
problem: T,
is_base_case: Callable[[T], bool],
solve_base: Callable[[T], U],
divide: Callable[[T], List[T]],
combine: Callable[[List[U]], U]
) -> U:
"""
Solve problem using parallel divide and conquer.
"""
if is_base_case(problem):
return solve_base(problem)
# Divide
subproblems = divide(problem)
# Conquer (parallel)
futures = [
self.executor.submit(
self.solve, sub, is_base_case, solve_base, divide, combine
)
for sub in subproblems
]
# Combine
results = [f.result() for f in futures]
return combine(results)
# Example: Parallel merge sort
def parallel_merge_sort(arr: List[int]) -> List[int]:
pdc = ParallelDivideConquer(threshold=100, max_workers=4)
def is_base(a):
return len(a) <= 100
def solve_base(a):
return sorted(a)
def divide(a):
mid = len(a) // 2
return [a[:mid], a[mid:]]
def combine(sorted_halves):
if len(sorted_halves) != 2:
return sorted_halves[0] if sorted_halves else []
left, right = sorted_halves
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
return pdc.solve(arr, is_base, solve_base, divide, combine)
6. Synchronization Patterns
Double-Checked Locking
class Singleton:
"""Thread-safe singleton using double-checked locking"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None: # First check (no lock)
with cls._lock: # Acquire lock
if cls._instance is None: # Second check (with lock)
cls._instance = super().__new__(cls)
return cls._instance
# Python-specific: Use __new__ or module-level
# Double-checked locking is tricky to get right!
Read-Copy-Update (RCU)
class RCUData:
"""Read-Copy-Update pattern for read-heavy workloads"""
def __init__(self, initial_data: dict):
self._data = initial_data.copy()
self._lock = threading.Lock()
def read(self) -> dict:
"""
Read current data (lock-free).
Returns a reference that remains valid.
"""
return self._data # Atomic read of reference
def update(self, key: str, value: Any):
"""
Update by copying, modifying, and replacing.
"""
with self._lock:
# Copy
new_data = self._data.copy()
# Update
new_data[key] = value
# Replace (atomic)
self._data = new_data
def bulk_update(self, updates: dict):
"""Bulk update"""
with self._lock:
new_data = self._data.copy()
new_data.update(updates)
self._data = new_data
Compare-and-Swap Pattern
import threading
class CASValue:
"""Compare-and-swap value (simulated)"""
def __init__(self, initial: Any):
self._value = initial
self._lock = threading.Lock()
def get(self) -> Any:
return self._value
def compare_and_swap(self, expected: Any, new_value: Any) -> bool:
"""
Atomically: if value == expected, set to new_value.
Returns whether the swap occurred.
"""
with self._lock:
if self._value == expected:
self._value = new_value
return True
return False
class LockFreeCounter:
"""Counter using CAS"""
def __init__(self):
self._value = CASValue(0)
def increment(self) -> int:
"""Increment and return new value"""
while True:
current = self._value.get()
if self._value.compare_and_swap(current, current + 1):
return current + 1
# CAS failed, retry
def get(self) -> int:
return self._value.get()
Exercises
Basic
Implement a producer-consumer with multiple producers and consumers.
Create a thread pool that can be resized at runtime.
Implement a simple future with timeout support.
Intermediate
Build an actor system with supervision (restart failed actors).
Implement a parallel map-reduce for computing average.
Create a work-stealing scheduler and compare to static partitioning.
Advanced
Implement a concurrent cache with read-copy-update semantics.
Build a reactive streams processor with backpressure.
Design a distributed actor system across multiple processes.
Summary
- Producer-consumer decouples data generation from processing
- Thread pools amortize thread creation cost
- Work stealing balances load dynamically
- Futures represent pending computations
- Actors encapsulate state and communicate via messages
- Map-reduce and scatter-gather parallelize data processing