Threading and Synchronization
Introduction
Threads are the fundamental unit of concurrent execution within a process. This reading covers thread management, synchronization primitives, and common pitfalls in multithreaded programming.
Learning Objectives
By the end of this reading, you will be able to:
- Create and manage threads
- Use locks, semaphores, and condition variables
- Avoid deadlocks and race conditions
- Implement thread-safe data structures
- Understand the Python GIL and its implications
1. Threads vs Processes
Comparison
import threading
import multiprocessing
import os
def show_ids():
print(f"Process ID: {os.getpid()}, Thread: {threading.current_thread().name}")
# Threads: Share memory, lightweight
def thread_example():
"""Threads share the same process"""
threads = []
for i in range(3):
t = threading.Thread(target=show_ids, name=f"Thread-{i}")
threads.append(t)
t.start()
for t in threads:
t.join()
# All show same PID, different thread names
# Processes: Separate memory, heavyweight
def process_example():
"""Processes have separate memory"""
processes = []
for i in range(3):
p = multiprocessing.Process(target=show_ids, name=f"Process-{i}")
processes.append(p)
p.start()
for p in processes:
p.join()
# Each shows different PID
When to Use Which
"""
Use THREADS when:
- I/O-bound tasks (network, file I/O)
- Shared state is needed
- Low overhead is important
- Quick context switches needed
Use PROCESSES when:
- CPU-bound tasks (computation)
- Need true parallelism (bypass GIL)
- Memory isolation desired
- Stability (crash doesn't affect others)
"""
# I/O bound - threads work well
def download_files(urls):
"""Network I/O - threads wait efficiently"""
def download(url):
response = requests.get(url)
return response.content
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(download, urls)
return list(results)
# CPU bound - processes work better (in Python)
def compute_heavy(numbers):
"""CPU work - processes avoid GIL"""
def heavy_computation(n):
return sum(i*i for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(heavy_computation, numbers)
return list(results)
2. Thread Creation and Management
Basic Threading
import threading
import time
# Method 1: Function target
def worker(name, duration):
"""Worker function"""
print(f"{name} starting")
time.sleep(duration)
print(f"{name} finished")
thread = threading.Thread(target=worker, args=("Worker-1", 2))
thread.start()
thread.join() # Wait for completion
# Method 2: Subclass Thread
class WorkerThread(threading.Thread):
def __init__(self, name, duration):
super().__init__()
self.name = name
self.duration = duration
self.result = None
def run(self):
"""Override run() with thread logic"""
print(f"{self.name} starting")
time.sleep(self.duration)
self.result = f"{self.name} completed"
print(f"{self.name} finished")
worker = WorkerThread("CustomWorker", 2)
worker.start()
worker.join()
print(worker.result)
Thread Lifecycle
class ThreadLifecycle:
"""Demonstrate thread states"""
def __init__(self):
self.thread = None
def create_thread(self):
"""Thread is created but not started"""
self.thread = threading.Thread(target=self.task)
print(f"Created: alive={self.thread.is_alive()}") # False
def start_thread(self):
"""Thread is now running"""
self.thread.start()
print(f"Started: alive={self.thread.is_alive()}") # True
def join_thread(self):
"""Wait for thread to complete"""
self.thread.join()
print(f"Joined: alive={self.thread.is_alive()}") # False
def task(self):
time.sleep(1)
# Daemon threads
def daemon_example():
"""Daemon threads don't prevent program exit"""
def background_task():
while True:
print("Background work...")
time.sleep(1)
t = threading.Thread(target=background_task)
t.daemon = True # Will be killed when main thread exits
t.start()
time.sleep(3)
print("Main thread exiting - daemon will be killed")
3. Synchronization Primitives
Locks (Mutexes)
import threading
class BankAccount:
"""Thread-safe bank account using locks"""
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.Lock()
def deposit(self, amount):
with self.lock: # Acquire lock, auto-release
# Critical section - only one thread at a time
current = self.balance
time.sleep(0.001) # Simulate processing
self.balance = current + amount
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
current = self.balance
time.sleep(0.001)
self.balance = current - amount
return True
return False
def get_balance(self):
with self.lock:
return self.balance
# Manual lock management (less preferred)
def manual_lock_example():
lock = threading.Lock()
lock.acquire()
try:
# Critical section
pass
finally:
lock.release() # Always release!
# Try-lock (non-blocking)
def try_lock_example():
lock = threading.Lock()
if lock.acquire(blocking=False):
try:
# Got the lock
pass
finally:
lock.release()
else:
# Lock was held by another thread
pass
RLock (Reentrant Lock)
class RecursiveExample:
"""RLock allows same thread to acquire multiple times"""
def __init__(self):
self.lock = threading.RLock() # Reentrant lock
self.data = []
def add_item(self, item):
with self.lock:
self.data.append(item)
self.log(f"Added {item}") # Calls another locked method
def log(self, message):
with self.lock: # Same thread can acquire again!
print(f"[LOG] {message}, items: {len(self.data)}")
def add_batch(self, items):
with self.lock:
for item in items:
self.add_item(item) # Recursive lock acquisition
# With regular Lock, this would deadlock!
# RLock tracks the owning thread and allows reentry
Semaphores
class ConnectionPool:
"""Limit concurrent access using semaphore"""
def __init__(self, max_connections=5):
self.semaphore = threading.Semaphore(max_connections)
self.connections = []
def get_connection(self):
self.semaphore.acquire() # Block if no permits available
# In real code, return actual connection
return f"Connection-{threading.current_thread().name}"
def release_connection(self, conn):
self.semaphore.release() # Release permit
# Bounded semaphore prevents releasing more than acquired
bounded = threading.BoundedSemaphore(5)
# bounded.release() without acquire raises ValueError
# Binary semaphore (like a lock but different semantics)
binary = threading.Semaphore(1)
Condition Variables
class BoundedBuffer:
"""Producer-consumer buffer using condition variables"""
def __init__(self, capacity=10):
self.capacity = capacity
self.buffer = []
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def put(self, item):
with self.not_full:
while len(self.buffer) >= self.capacity:
self.not_full.wait() # Release lock and wait
self.buffer.append(item)
self.not_empty.notify() # Wake up consumer
def get(self):
with self.not_empty:
while len(self.buffer) == 0:
self.not_empty.wait() # Release lock and wait
item = self.buffer.pop(0)
self.not_full.notify() # Wake up producer
return item
def producer_consumer_demo():
buffer = BoundedBuffer(5)
def producer():
for i in range(20):
buffer.put(f"item-{i}")
print(f"Produced item-{i}")
time.sleep(0.1)
def consumer():
for _ in range(20):
item = buffer.get()
print(f"Consumed {item}")
time.sleep(0.2)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
Events
class WorkCoordinator:
"""Coordinate threads using events"""
def __init__(self):
self.ready_event = threading.Event()
self.shutdown_event = threading.Event()
def worker(self, worker_id):
print(f"Worker {worker_id} waiting for ready signal...")
self.ready_event.wait() # Block until event is set
print(f"Worker {worker_id} starting work")
while not self.shutdown_event.is_set():
# Do work
time.sleep(0.5)
if self.shutdown_event.wait(timeout=0.1):
break
print(f"Worker {worker_id} shutting down")
def start_workers(self, num_workers):
threads = []
for i in range(num_workers):
t = threading.Thread(target=self.worker, args=(i,))
t.start()
threads.append(t)
time.sleep(1)
print("Signaling workers to start...")
self.ready_event.set() # All workers start
time.sleep(3)
print("Signaling shutdown...")
self.shutdown_event.set() # All workers stop
for t in threads:
t.join()
Barriers
def barrier_example():
"""Synchronize threads at a point"""
num_threads = 4
barrier = threading.Barrier(num_threads)
def phase_worker(worker_id):
# Phase 1
print(f"Worker {worker_id}: Phase 1")
time.sleep(worker_id * 0.1) # Different times
barrier.wait() # All must reach here before continuing
print(f"Worker {worker_id}: Phase 2")
barrier.wait()
print(f"Worker {worker_id}: Phase 3")
threads = [
threading.Thread(target=phase_worker, args=(i,))
for i in range(num_threads)
]
for t in threads:
t.start()
for t in threads:
t.join()
4. Common Pitfalls
Race Conditions
class RaceConditionDemo:
"""Demonstrate race conditions"""
def __init__(self):
self.counter = 0
def unsafe_increment(self):
"""Race condition: read-modify-write is not atomic"""
# This is actually:
# 1. Read counter into temp
# 2. Add 1 to temp
# 3. Write temp to counter
# Another thread can interleave!
self.counter += 1
def safe_increment(self):
"""Use lock to make atomic"""
with self.lock:
self.counter += 1
def demonstrate_race():
demo = RaceConditionDemo()
def increment_many():
for _ in range(100000):
demo.unsafe_increment()
threads = [threading.Thread(target=increment_many) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Expected: 400000, Got: {demo.counter}")
# Usually less due to lost updates!
Deadlocks
class DeadlockDemo:
"""Demonstrate deadlock"""
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def thread_1_work(self):
"""Acquires A then B"""
with self.lock_a:
print("Thread 1: Got lock A")
time.sleep(0.1)
with self.lock_b:
print("Thread 1: Got lock B")
def thread_2_work(self):
"""Acquires B then A - DEADLOCK!"""
with self.lock_b:
print("Thread 2: Got lock B")
time.sleep(0.1)
with self.lock_a: # Waits forever!
print("Thread 2: Got lock A")
def run_deadlock(self):
"""This will deadlock!"""
t1 = threading.Thread(target=self.thread_1_work)
t2 = threading.Thread(target=self.thread_2_work)
t1.start()
t2.start()
# Neither thread completes!
class DeadlockPrevention:
"""Strategies to prevent deadlock"""
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
# Strategy 1: Lock ordering
def ordered_locks(self):
"""Always acquire locks in same order"""
# Both threads acquire A before B
with self.lock_a:
with self.lock_b:
pass # Safe!
# Strategy 2: Try-lock with backoff
def try_lock_backoff(self):
"""Try to acquire, back off if can't"""
while True:
if self.lock_a.acquire(blocking=False):
try:
if self.lock_b.acquire(blocking=False):
try:
# Got both locks!
return
finally:
self.lock_b.release()
finally:
self.lock_a.release()
# Back off and retry
time.sleep(random.uniform(0.001, 0.01))
# Strategy 3: Lock timeout
def timeout_locks(self):
"""Use timeout to detect deadlock"""
if self.lock_a.acquire(timeout=1.0):
try:
if self.lock_b.acquire(timeout=1.0):
try:
return True
finally:
self.lock_b.release()
finally:
self.lock_a.release()
return False # Failed to acquire
Livelocks
class LivelockDemo:
"""Demonstrate livelock - threads active but no progress"""
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def polite_thread_1(self):
"""Too polite - keeps yielding"""
while True:
self.lock_a.acquire()
if not self.lock_b.acquire(blocking=False):
# Can't get B, be "polite" and release A
self.lock_a.release()
time.sleep(0.001) # Small delay
continue
# Got both
try:
print("Thread 1: Working")
return
finally:
self.lock_b.release()
self.lock_a.release()
def polite_thread_2(self):
"""Both threads keep yielding to each other!"""
while True:
self.lock_b.acquire()
if not self.lock_a.acquire(blocking=False):
self.lock_b.release()
time.sleep(0.001)
continue
try:
print("Thread 2: Working")
return
finally:
self.lock_a.release()
self.lock_b.release()
# Solution: Add randomization to backoff
# time.sleep(random.uniform(0.001, 0.01))
Priority Inversion
"""
Priority Inversion:
- High priority thread H needs lock held by low priority L
- Medium priority thread M preempts L
- H waits for L, which waits for M
- Result: H effectively has lower priority than M!
Solution: Priority inheritance
- When H blocks on L's lock, L inherits H's priority
- L can't be preempted by M
- L finishes quickly, releases lock
- H can continue
"""
5. Thread-Safe Data Structures
Thread-Safe Queue
import queue
def thread_safe_queue_example():
"""Python's queue module is thread-safe"""
q = queue.Queue(maxsize=10)
def producer():
for i in range(20):
q.put(f"item-{i}") # Blocks if full
print(f"Produced item-{i}")
def consumer():
while True:
try:
item = q.get(timeout=2) # Blocks if empty
print(f"Consumed {item}")
q.task_done()
except queue.Empty:
break
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
# Other thread-safe queues
# queue.LifoQueue() # Stack
# queue.PriorityQueue() # Heap
Lock-Free Counter
import ctypes
class AtomicCounter:
"""Atomic counter using compare-and-swap concept"""
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
"""Atomic increment"""
with self._lock:
self._value += 1
return self._value
def get(self):
return self._value
# For true lock-free, use atomic operations (C extension or specialized libraries)
# Python's += is not atomic!
Read-Write Lock
class ReadWriteLock:
"""Allow multiple readers OR single writer"""
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
def acquire_read(self):
"""Acquire read lock (multiple allowed)"""
with self._read_ready:
self._readers += 1
def release_read(self):
"""Release read lock"""
with self._read_ready:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
"""Acquire write lock (exclusive)"""
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
"""Release write lock"""
self._read_ready.release()
class ThreadSafeDict:
"""Dictionary with read-write lock"""
def __init__(self):
self._data = {}
self._lock = ReadWriteLock()
def get(self, key):
self._lock.acquire_read()
try:
return self._data.get(key)
finally:
self._lock.release_read()
def set(self, key, value):
self._lock.acquire_write()
try:
self._data[key] = value
finally:
self._lock.release_write()
6. The Python GIL
Understanding the GIL
"""
Global Interpreter Lock (GIL):
- Python (CPython) has a lock that only allows one thread to execute Python bytecode
- This means Python threads don't run truly in parallel for CPU work
- GIL is released during I/O operations
Implications:
- CPU-bound: Use multiprocessing, not threading
- I/O-bound: Threading works fine (GIL released during I/O)
"""
import threading
import multiprocessing
import time
def cpu_bound_task(n):
"""CPU-intensive work"""
total = 0
for i in range(n):
total += i * i
return total
def benchmark_cpu_bound():
"""Threading vs multiprocessing for CPU work"""
n = 10_000_000
iterations = 4
# Sequential
start = time.time()
for _ in range(iterations):
cpu_bound_task(n)
seq_time = time.time() - start
# Threading (limited by GIL)
start = time.time()
threads = [
threading.Thread(target=cpu_bound_task, args=(n,))
for _ in range(iterations)
]
for t in threads:
t.start()
for t in threads:
t.join()
thread_time = time.time() - start
# Multiprocessing (bypasses GIL)
start = time.time()
with multiprocessing.Pool(iterations) as pool:
pool.map(cpu_bound_task, [n] * iterations)
process_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")
print(f"Threading: {thread_time:.2f}s") # Similar to sequential!
print(f"Multiprocessing: {process_time:.2f}s") # Actually faster
When Threading Works in Python
import urllib.request
import threading
import time
def io_bound_task(url):
"""I/O-intensive work - GIL is released"""
with urllib.request.urlopen(url) as response:
return len(response.read())
def benchmark_io_bound():
"""Threading works well for I/O"""
urls = ["https://example.com"] * 10
# Sequential
start = time.time()
for url in urls:
io_bound_task(url)
seq_time = time.time() - start
# Threading (GIL released during I/O wait)
start = time.time()
threads = [
threading.Thread(target=io_bound_task, args=(url,))
for url in urls
]
for t in threads:
t.start()
for t in threads:
t.join()
thread_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")
print(f"Threading: {thread_time:.2f}s") # Much faster!
Exercises
Basic
Create a program that spawns 5 threads, each printing its ID 10 times.
Implement a thread-safe counter using a lock.
Explain the difference between a Semaphore(1) and a Lock.
Intermediate
Implement the dining philosophers problem and solve deadlock using ordered locks.
Create a reader-writer lock and demonstrate with multiple readers and writers.
Implement a thread pool with a fixed number of worker threads.
Advanced
Build a concurrent web crawler that limits concurrent requests.
Implement a lock-free stack using compare-and-swap semantics.
Create a parallel merge sort that switches to sequential for small subarrays.
Summary
- Threads share memory; processes have separate memory
- Locks provide mutual exclusion but can cause deadlock
- Condition variables enable waiting for conditions
- Avoid deadlocks with lock ordering or try-lock
- Python's GIL limits CPU parallelism; use multiprocessing for CPU-bound work
- Use thread-safe queues for producer-consumer patterns