Threading and Synchronization

Introduction

Threads are the fundamental unit of concurrent execution within a process. This reading covers thread management, synchronization primitives, and common pitfalls in multithreaded programming.

Learning Objectives

By the end of this reading, you will be able to:

  • Create and manage threads
  • Use locks, semaphores, and condition variables
  • Avoid deadlocks and race conditions
  • Implement thread-safe data structures
  • Understand the Python GIL and its implications

1. Threads vs Processes

Comparison

import threading
import multiprocessing
import os

def show_ids():
    print(f"Process ID: {os.getpid()}, Thread: {threading.current_thread().name}")

# Threads: Share memory, lightweight
def thread_example():
    """Threads share the same process"""
    threads = []
    for i in range(3):
        t = threading.Thread(target=show_ids, name=f"Thread-{i}")
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    # All show same PID, different thread names

# Processes: Separate memory, heavyweight
def process_example():
    """Processes have separate memory"""
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=show_ids, name=f"Process-{i}")
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    # Each shows different PID

When to Use Which

"""
Use THREADS when:
- I/O-bound tasks (network, file I/O)
- Shared state is needed
- Low overhead is important
- Quick context switches needed

Use PROCESSES when:
- CPU-bound tasks (computation)
- Need true parallelism (bypass GIL)
- Memory isolation desired
- Stability (crash doesn't affect others)
"""

# I/O bound - threads work well
def download_files(urls):
    """Network I/O - threads wait efficiently"""
    def download(url):
        response = requests.get(url)
        return response.content

    with ThreadPoolExecutor(max_workers=10) as executor:
        results = executor.map(download, urls)
    return list(results)

# CPU bound - processes work better (in Python)
def compute_heavy(numbers):
    """CPU work - processes avoid GIL"""
    def heavy_computation(n):
        return sum(i*i for i in range(n))

    with ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(heavy_computation, numbers)
    return list(results)

2. Thread Creation and Management

Basic Threading

import threading
import time

# Method 1: Function target
def worker(name, duration):
    """Worker function"""
    print(f"{name} starting")
    time.sleep(duration)
    print(f"{name} finished")

thread = threading.Thread(target=worker, args=("Worker-1", 2))
thread.start()
thread.join()  # Wait for completion

# Method 2: Subclass Thread
class WorkerThread(threading.Thread):
    def __init__(self, name, duration):
        super().__init__()
        self.name = name
        self.duration = duration
        self.result = None

    def run(self):
        """Override run() with thread logic"""
        print(f"{self.name} starting")
        time.sleep(self.duration)
        self.result = f"{self.name} completed"
        print(f"{self.name} finished")

worker = WorkerThread("CustomWorker", 2)
worker.start()
worker.join()
print(worker.result)

Thread Lifecycle

class ThreadLifecycle:
    """Demonstrate thread states"""

    def __init__(self):
        self.thread = None

    def create_thread(self):
        """Thread is created but not started"""
        self.thread = threading.Thread(target=self.task)
        print(f"Created: alive={self.thread.is_alive()}")  # False

    def start_thread(self):
        """Thread is now running"""
        self.thread.start()
        print(f"Started: alive={self.thread.is_alive()}")  # True

    def join_thread(self):
        """Wait for thread to complete"""
        self.thread.join()
        print(f"Joined: alive={self.thread.is_alive()}")  # False

    def task(self):
        time.sleep(1)

# Daemon threads
def daemon_example():
    """Daemon threads don't prevent program exit"""
    def background_task():
        while True:
            print("Background work...")
            time.sleep(1)

    t = threading.Thread(target=background_task)
    t.daemon = True  # Will be killed when main thread exits
    t.start()

    time.sleep(3)
    print("Main thread exiting - daemon will be killed")

3. Synchronization Primitives

Locks (Mutexes)

import threading

class BankAccount:
    """Thread-safe bank account using locks"""

    def __init__(self, balance=0):
        self.balance = balance
        self.lock = threading.Lock()

    def deposit(self, amount):
        with self.lock:  # Acquire lock, auto-release
            # Critical section - only one thread at a time
            current = self.balance
            time.sleep(0.001)  # Simulate processing
            self.balance = current + amount

    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                current = self.balance
                time.sleep(0.001)
                self.balance = current - amount
                return True
            return False

    def get_balance(self):
        with self.lock:
            return self.balance

# Manual lock management (less preferred)
def manual_lock_example():
    lock = threading.Lock()

    lock.acquire()
    try:
        # Critical section
        pass
    finally:
        lock.release()  # Always release!

# Try-lock (non-blocking)
def try_lock_example():
    lock = threading.Lock()

    if lock.acquire(blocking=False):
        try:
            # Got the lock
            pass
        finally:
            lock.release()
    else:
        # Lock was held by another thread
        pass

RLock (Reentrant Lock)

class RecursiveExample:
    """RLock allows same thread to acquire multiple times"""

    def __init__(self):
        self.lock = threading.RLock()  # Reentrant lock
        self.data = []

    def add_item(self, item):
        with self.lock:
            self.data.append(item)
            self.log(f"Added {item}")  # Calls another locked method

    def log(self, message):
        with self.lock:  # Same thread can acquire again!
            print(f"[LOG] {message}, items: {len(self.data)}")

    def add_batch(self, items):
        with self.lock:
            for item in items:
                self.add_item(item)  # Recursive lock acquisition

# With regular Lock, this would deadlock!
# RLock tracks the owning thread and allows reentry

Semaphores

class ConnectionPool:
    """Limit concurrent access using semaphore"""

    def __init__(self, max_connections=5):
        self.semaphore = threading.Semaphore(max_connections)
        self.connections = []

    def get_connection(self):
        self.semaphore.acquire()  # Block if no permits available
        # In real code, return actual connection
        return f"Connection-{threading.current_thread().name}"

    def release_connection(self, conn):
        self.semaphore.release()  # Release permit

# Bounded semaphore prevents releasing more than acquired
bounded = threading.BoundedSemaphore(5)
# bounded.release() without acquire raises ValueError

# Binary semaphore (like a lock but different semantics)
binary = threading.Semaphore(1)

Condition Variables

class BoundedBuffer:
    """Producer-consumer buffer using condition variables"""

    def __init__(self, capacity=10):
        self.capacity = capacity
        self.buffer = []
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def put(self, item):
        with self.not_full:
            while len(self.buffer) >= self.capacity:
                self.not_full.wait()  # Release lock and wait

            self.buffer.append(item)
            self.not_empty.notify()  # Wake up consumer

    def get(self):
        with self.not_empty:
            while len(self.buffer) == 0:
                self.not_empty.wait()  # Release lock and wait

            item = self.buffer.pop(0)
            self.not_full.notify()  # Wake up producer
            return item

def producer_consumer_demo():
    buffer = BoundedBuffer(5)

    def producer():
        for i in range(20):
            buffer.put(f"item-{i}")
            print(f"Produced item-{i}")
            time.sleep(0.1)

    def consumer():
        for _ in range(20):
            item = buffer.get()
            print(f"Consumed {item}")
            time.sleep(0.2)

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

Events

class WorkCoordinator:
    """Coordinate threads using events"""

    def __init__(self):
        self.ready_event = threading.Event()
        self.shutdown_event = threading.Event()

    def worker(self, worker_id):
        print(f"Worker {worker_id} waiting for ready signal...")
        self.ready_event.wait()  # Block until event is set

        print(f"Worker {worker_id} starting work")
        while not self.shutdown_event.is_set():
            # Do work
            time.sleep(0.5)
            if self.shutdown_event.wait(timeout=0.1):
                break

        print(f"Worker {worker_id} shutting down")

    def start_workers(self, num_workers):
        threads = []
        for i in range(num_workers):
            t = threading.Thread(target=self.worker, args=(i,))
            t.start()
            threads.append(t)

        time.sleep(1)
        print("Signaling workers to start...")
        self.ready_event.set()  # All workers start

        time.sleep(3)
        print("Signaling shutdown...")
        self.shutdown_event.set()  # All workers stop

        for t in threads:
            t.join()

Barriers

def barrier_example():
    """Synchronize threads at a point"""
    num_threads = 4
    barrier = threading.Barrier(num_threads)

    def phase_worker(worker_id):
        # Phase 1
        print(f"Worker {worker_id}: Phase 1")
        time.sleep(worker_id * 0.1)  # Different times

        barrier.wait()  # All must reach here before continuing
        print(f"Worker {worker_id}: Phase 2")

        barrier.wait()
        print(f"Worker {worker_id}: Phase 3")

    threads = [
        threading.Thread(target=phase_worker, args=(i,))
        for i in range(num_threads)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

4. Common Pitfalls

Race Conditions

class RaceConditionDemo:
    """Demonstrate race conditions"""

    def __init__(self):
        self.counter = 0

    def unsafe_increment(self):
        """Race condition: read-modify-write is not atomic"""
        # This is actually:
        # 1. Read counter into temp
        # 2. Add 1 to temp
        # 3. Write temp to counter
        # Another thread can interleave!
        self.counter += 1

    def safe_increment(self):
        """Use lock to make atomic"""
        with self.lock:
            self.counter += 1

def demonstrate_race():
    demo = RaceConditionDemo()

    def increment_many():
        for _ in range(100000):
            demo.unsafe_increment()

    threads = [threading.Thread(target=increment_many) for _ in range(4)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print(f"Expected: 400000, Got: {demo.counter}")
    # Usually less due to lost updates!

Deadlocks

class DeadlockDemo:
    """Demonstrate deadlock"""

    def __init__(self):
        self.lock_a = threading.Lock()
        self.lock_b = threading.Lock()

    def thread_1_work(self):
        """Acquires A then B"""
        with self.lock_a:
            print("Thread 1: Got lock A")
            time.sleep(0.1)
            with self.lock_b:
                print("Thread 1: Got lock B")

    def thread_2_work(self):
        """Acquires B then A - DEADLOCK!"""
        with self.lock_b:
            print("Thread 2: Got lock B")
            time.sleep(0.1)
            with self.lock_a:  # Waits forever!
                print("Thread 2: Got lock A")

    def run_deadlock(self):
        """This will deadlock!"""
        t1 = threading.Thread(target=self.thread_1_work)
        t2 = threading.Thread(target=self.thread_2_work)
        t1.start()
        t2.start()
        # Neither thread completes!

class DeadlockPrevention:
    """Strategies to prevent deadlock"""

    def __init__(self):
        self.lock_a = threading.Lock()
        self.lock_b = threading.Lock()

    # Strategy 1: Lock ordering
    def ordered_locks(self):
        """Always acquire locks in same order"""
        # Both threads acquire A before B
        with self.lock_a:
            with self.lock_b:
                pass  # Safe!

    # Strategy 2: Try-lock with backoff
    def try_lock_backoff(self):
        """Try to acquire, back off if can't"""
        while True:
            if self.lock_a.acquire(blocking=False):
                try:
                    if self.lock_b.acquire(blocking=False):
                        try:
                            # Got both locks!
                            return
                        finally:
                            self.lock_b.release()
                finally:
                    self.lock_a.release()
            # Back off and retry
            time.sleep(random.uniform(0.001, 0.01))

    # Strategy 3: Lock timeout
    def timeout_locks(self):
        """Use timeout to detect deadlock"""
        if self.lock_a.acquire(timeout=1.0):
            try:
                if self.lock_b.acquire(timeout=1.0):
                    try:
                        return True
                    finally:
                        self.lock_b.release()
            finally:
                self.lock_a.release()
        return False  # Failed to acquire

Livelocks

class LivelockDemo:
    """Demonstrate livelock - threads active but no progress"""

    def __init__(self):
        self.lock_a = threading.Lock()
        self.lock_b = threading.Lock()

    def polite_thread_1(self):
        """Too polite - keeps yielding"""
        while True:
            self.lock_a.acquire()
            if not self.lock_b.acquire(blocking=False):
                # Can't get B, be "polite" and release A
                self.lock_a.release()
                time.sleep(0.001)  # Small delay
                continue

            # Got both
            try:
                print("Thread 1: Working")
                return
            finally:
                self.lock_b.release()
                self.lock_a.release()

    def polite_thread_2(self):
        """Both threads keep yielding to each other!"""
        while True:
            self.lock_b.acquire()
            if not self.lock_a.acquire(blocking=False):
                self.lock_b.release()
                time.sleep(0.001)
                continue

            try:
                print("Thread 2: Working")
                return
            finally:
                self.lock_a.release()
                self.lock_b.release()

# Solution: Add randomization to backoff
# time.sleep(random.uniform(0.001, 0.01))

Priority Inversion

"""
Priority Inversion:
- High priority thread H needs lock held by low priority L
- Medium priority thread M preempts L
- H waits for L, which waits for M
- Result: H effectively has lower priority than M!

Solution: Priority inheritance
- When H blocks on L's lock, L inherits H's priority
- L can't be preempted by M
- L finishes quickly, releases lock
- H can continue
"""

5. Thread-Safe Data Structures

Thread-Safe Queue

import queue

def thread_safe_queue_example():
    """Python's queue module is thread-safe"""

    q = queue.Queue(maxsize=10)

    def producer():
        for i in range(20):
            q.put(f"item-{i}")  # Blocks if full
            print(f"Produced item-{i}")

    def consumer():
        while True:
            try:
                item = q.get(timeout=2)  # Blocks if empty
                print(f"Consumed {item}")
                q.task_done()
            except queue.Empty:
                break

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

# Other thread-safe queues
# queue.LifoQueue()  # Stack
# queue.PriorityQueue()  # Heap

Lock-Free Counter

import ctypes

class AtomicCounter:
    """Atomic counter using compare-and-swap concept"""

    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def increment(self):
        """Atomic increment"""
        with self._lock:
            self._value += 1
            return self._value

    def get(self):
        return self._value

# For true lock-free, use atomic operations (C extension or specialized libraries)
# Python's += is not atomic!

Read-Write Lock

class ReadWriteLock:
    """Allow multiple readers OR single writer"""

    def __init__(self):
        self._read_ready = threading.Condition(threading.Lock())
        self._readers = 0

    def acquire_read(self):
        """Acquire read lock (multiple allowed)"""
        with self._read_ready:
            self._readers += 1

    def release_read(self):
        """Release read lock"""
        with self._read_ready:
            self._readers -= 1
            if self._readers == 0:
                self._read_ready.notify_all()

    def acquire_write(self):
        """Acquire write lock (exclusive)"""
        self._read_ready.acquire()
        while self._readers > 0:
            self._read_ready.wait()

    def release_write(self):
        """Release write lock"""
        self._read_ready.release()

class ThreadSafeDict:
    """Dictionary with read-write lock"""

    def __init__(self):
        self._data = {}
        self._lock = ReadWriteLock()

    def get(self, key):
        self._lock.acquire_read()
        try:
            return self._data.get(key)
        finally:
            self._lock.release_read()

    def set(self, key, value):
        self._lock.acquire_write()
        try:
            self._data[key] = value
        finally:
            self._lock.release_write()

6. The Python GIL

Understanding the GIL

"""
Global Interpreter Lock (GIL):
- Python (CPython) has a lock that only allows one thread to execute Python bytecode
- This means Python threads don't run truly in parallel for CPU work
- GIL is released during I/O operations

Implications:
- CPU-bound: Use multiprocessing, not threading
- I/O-bound: Threading works fine (GIL released during I/O)
"""

import threading
import multiprocessing
import time

def cpu_bound_task(n):
    """CPU-intensive work"""
    total = 0
    for i in range(n):
        total += i * i
    return total

def benchmark_cpu_bound():
    """Threading vs multiprocessing for CPU work"""
    n = 10_000_000
    iterations = 4

    # Sequential
    start = time.time()
    for _ in range(iterations):
        cpu_bound_task(n)
    seq_time = time.time() - start

    # Threading (limited by GIL)
    start = time.time()
    threads = [
        threading.Thread(target=cpu_bound_task, args=(n,))
        for _ in range(iterations)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    thread_time = time.time() - start

    # Multiprocessing (bypasses GIL)
    start = time.time()
    with multiprocessing.Pool(iterations) as pool:
        pool.map(cpu_bound_task, [n] * iterations)
    process_time = time.time() - start

    print(f"Sequential:      {seq_time:.2f}s")
    print(f"Threading:       {thread_time:.2f}s")  # Similar to sequential!
    print(f"Multiprocessing: {process_time:.2f}s")  # Actually faster

When Threading Works in Python

import urllib.request
import threading
import time

def io_bound_task(url):
    """I/O-intensive work - GIL is released"""
    with urllib.request.urlopen(url) as response:
        return len(response.read())

def benchmark_io_bound():
    """Threading works well for I/O"""
    urls = ["https://example.com"] * 10

    # Sequential
    start = time.time()
    for url in urls:
        io_bound_task(url)
    seq_time = time.time() - start

    # Threading (GIL released during I/O wait)
    start = time.time()
    threads = [
        threading.Thread(target=io_bound_task, args=(url,))
        for url in urls
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    thread_time = time.time() - start

    print(f"Sequential: {seq_time:.2f}s")
    print(f"Threading:  {thread_time:.2f}s")  # Much faster!

Exercises

Basic

  1. Create a program that spawns 5 threads, each printing its ID 10 times.

  2. Implement a thread-safe counter using a lock.

  3. Explain the difference between a Semaphore(1) and a Lock.

Intermediate

  1. Implement the dining philosophers problem and solve deadlock using ordered locks.

  2. Create a reader-writer lock and demonstrate with multiple readers and writers.

  3. Implement a thread pool with a fixed number of worker threads.

Advanced

  1. Build a concurrent web crawler that limits concurrent requests.

  2. Implement a lock-free stack using compare-and-swap semantics.

  3. Create a parallel merge sort that switches to sequential for small subarrays.


Summary

  • Threads share memory; processes have separate memory
  • Locks provide mutual exclusion but can cause deadlock
  • Condition variables enable waiting for conditions
  • Avoid deadlocks with lock ordering or try-lock
  • Python's GIL limits CPU parallelism; use multiprocessing for CPU-bound work
  • Use thread-safe queues for producer-consumer patterns

Next Reading

Concurrent Patterns →