Parallel Computing Fundamentals
Parallel computing uses multiple processing elements simultaneously to solve problems faster. As single-core performance gains have slowed, parallelism has become essential for achieving performance. ## 1. Why Parallel Computing?
The End of Free Lunch
From 1970-2005, single-core performance doubled every 18 months (Moore's Law + Dennard Scaling). Since ~2005:
- Dennard Scaling ended (power wall)
- Clock speeds plateaued (~3-4 GHz)
- Transistor counts still grow, but as more cores
# Historical context
# 1990: 25 MHz single core
# 2000: 1 GHz single core
# 2005: 3 GHz single core (plateau)
# 2024: 3.5 GHz but 16+ cores
# To use modern hardware efficiently, we must parallelize
Types of Parallelism
1. Instruction-Level Parallelism (ILP)
- CPU executes multiple instructions per cycle
- Pipelining, superscalar, out-of-order execution
- Handled by hardware automatically
2. Data Parallelism
- Same operation on multiple data elements
- SIMD (Single Instruction, Multiple Data)
- GPUs excel at this
3. Task Parallelism
- Different operations on different data
- Multiple independent tasks
- Thread pools, work stealing
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp
# Data parallelism example (conceptually)
def data_parallel_add(a: list, b: list) -> list:
"""Add two arrays element-wise - naturally parallel"""
# Each element can be computed independently
return [x + y for x, y in zip(a, b)]
# With NumPy (uses SIMD internally):
# return np.array(a) + np.array(b)
# Task parallelism example
def task_parallel_pipeline(data):
"""Different tasks on data - can run concurrently"""
with ThreadPoolExecutor(max_workers=3) as executor:
# Three different tasks running in parallel
future1 = executor.submit(compute_statistics, data)
future2 = executor.submit(generate_report, data)
future3 = executor.submit(backup_data, data)
return future1.result(), future2.result(), future3.result()
2. Theoretical Limits
Amdahl's Law
The speedup from parallelization is limited by the sequential portion.
def amdahls_law(p: float, n: int) -> float:
"""
Calculate maximum speedup.
Args:
p: Fraction of program that can be parallelized (0 to 1)
n: Number of processors
Returns:
Maximum speedup factor
"""
# Speedup = 1 / ((1-p) + p/n)
sequential_part = 1 - p
parallel_part = p / n
return 1 / (sequential_part + parallel_part)
# Examples
print(f"90% parallel, 4 cores: {amdahls_law(0.9, 4):.2f}x") # 3.08x
print(f"90% parallel, 16 cores: {amdahls_law(0.9, 16):.2f}x") # 6.40x
print(f"90% parallel, ∞ cores: {amdahls_law(0.9, float('inf')):.2f}x") # 10x max!
print(f"99% parallel, 100 cores: {amdahls_law(0.99, 100):.2f}x") # 50.25x
print(f"99% parallel, ∞ cores: {amdahls_law(0.99, float('inf')):.2f}x") # 100x max
# Key insight: Even 1% sequential limits speedup to 100x regardless of cores!
Gustafson's Law
As problem size grows, parallel portion often grows too.
def gustafsons_law(p: float, n: int) -> float:
"""
Calculate scaled speedup.
Instead of fixed problem size, assume we scale the problem
to use available processors.
Args:
p: Fraction of parallel work (with n processors)
n: Number of processors
Returns:
Scaled speedup
"""
# Speedup = n - (1-p) * (n-1)
# Or equivalently: (1-p) + p*n
return (1 - p) + p * n
# With Gustafson's view, more cores = bigger problems
print(f"90% parallel, 16 cores: {gustafsons_law(0.9, 16):.1f}x") # 14.5x
print(f"90% parallel, 100 cores: {gustafsons_law(0.9, 100):.1f}x") # 90.1x
# More optimistic for scaling large problems
Efficiency and Scalability
def parallel_efficiency(speedup: float, num_processors: int) -> float:
"""
Calculate parallel efficiency.
Efficiency = Speedup / Number of Processors
Perfect efficiency = 1.0 (100%)
"""
return speedup / num_processors
def analyze_scalability(serial_time: float, parallel_times: dict):
"""Analyze scalability of a parallel program"""
print(f"Serial time: {serial_time:.3f}s\n")
print(f"{'Cores':<8} {'Time':<10} {'Speedup':<10} {'Efficiency':<10}")
print("-" * 38)
for cores, time in sorted(parallel_times.items()):
speedup = serial_time / time
efficiency = parallel_efficiency(speedup, cores)
print(f"{cores:<8} {time:<10.3f} {speedup:<10.2f} {efficiency:<10.2%}")
# Example data
analyze_scalability(
serial_time=10.0,
parallel_times={
1: 10.0,
2: 5.2,
4: 2.8,
8: 1.6,
16: 1.1,
32: 0.9
}
)
# Cores Time Speedup Efficiency
# 1 10.000 1.00 100.00%
# 2 5.200 1.92 96.15%
# 4 2.800 3.57 89.29%
# 8 1.600 6.25 78.12%
# 16 1.100 9.09 56.82%
# 32 0.900 11.11 34.72%
3. Flynn's Taxonomy
Classification of computer architectures:
Single Data Multiple Data
Single Instruction SISD SIMD
Multiple Instruction MISD MIMD
SISD (Single Instruction, Single Data)
Traditional sequential processor.
# SISD: One instruction, one data item at a time
def sisd_sum(arr):
"""Sequential sum - classic SISD"""
total = 0
for x in arr: # One element at a time
total += x # One addition at a time
return total
SIMD (Single Instruction, Multiple Data)
Same operation on multiple data simultaneously.
import numpy as np
# SIMD: One instruction operates on multiple data
def simd_sum_concept(arr):
"""
SIMD processes multiple elements with one instruction.
NumPy uses SIMD internally via vectorization.
"""
# Conceptually, instead of:
# for i in range(len(arr)): result[i] = arr[i] * 2
# SIMD does (in hardware):
# Load 4 values into vector register
# Multiply all 4 by 2 in single instruction
# Store 4 results
return np.array(arr) * 2 # Vectorized operation
# Performance comparison
import time
def benchmark_simd():
size = 10_000_000
arr_list = list(range(size))
arr_numpy = np.arange(size)
# Sequential (Python loop)
start = time.time()
result1 = [x * 2 for x in arr_list]
seq_time = time.time() - start
# Vectorized (NumPy/SIMD)
start = time.time()
result2 = arr_numpy * 2
vec_time = time.time() - start
print(f"Sequential: {seq_time:.3f}s")
print(f"Vectorized: {vec_time:.3f}s")
print(f"Speedup: {seq_time/vec_time:.1f}x")
MIMD (Multiple Instruction, Multiple Data)
Different processors execute different instructions on different data.
from multiprocessing import Process, Queue
import os
# MIMD: Different processors, different tasks
def task_a(q, data):
"""Processor 1: Compute sum"""
result = sum(data)
q.put(('sum', result))
def task_b(q, data):
"""Processor 2: Compute product"""
result = 1
for x in data:
result *= x
q.put(('product', result))
def task_c(q, data):
"""Processor 3: Find max"""
result = max(data)
q.put(('max', result))
def mimd_example():
"""Run different tasks on different processors"""
data = [1, 2, 3, 4, 5]
q = Queue()
# Each process runs different code (Multiple Instruction)
# on potentially different data (Multiple Data)
processes = [
Process(target=task_a, args=(q, data)),
Process(target=task_b, args=(q, data)),
Process(target=task_c, args=(q, data))
]
for p in processes:
p.start()
for p in processes:
p.join()
results = {}
while not q.empty():
name, value = q.get()
results[name] = value
return results
4. Memory Models
Shared Memory
All processors access the same memory space.
import threading
class SharedMemoryModel:
"""
Shared memory: All threads see same data.
Advantages:
- Easy to program (just share variables)
- Low latency for small data
Disadvantages:
- Need synchronization (locks, etc.)
- Doesn't scale to many processors
- Cache coherence overhead
"""
def __init__(self):
self.shared_data = {}
self.lock = threading.Lock()
def write(self, key, value):
with self.lock: # Need synchronization!
self.shared_data[key] = value
def read(self, key):
with self.lock:
return self.shared_data.get(key)
# Example: Shared counter (problematic without sync)
class Counter:
def __init__(self):
self.count = 0
def increment(self):
# Race condition: read-modify-write is not atomic
# Thread 1: reads 5
# Thread 2: reads 5
# Thread 1: writes 6
# Thread 2: writes 6 (lost update!)
self.count += 1
def demonstrate_race_condition():
"""Show why shared memory needs synchronization"""
counter = Counter()
threads = []
def increment_many():
for _ in range(100000):
counter.increment()
for _ in range(4):
t = threading.Thread(target=increment_many)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Expected: 400000, Got: {counter.count}")
# Will likely be less than 400000 due to race conditions!
Distributed Memory
Each processor has its own memory; communicate via messages.
from multiprocessing import Process, Pipe
class DistributedMemoryModel:
"""
Distributed memory: Each process has private memory.
Communication via message passing.
Advantages:
- Scales to many processors
- No cache coherence issues
- Can span multiple machines
Disadvantages:
- Explicit communication (more programming effort)
- Higher latency for communication
- Need to partition data
"""
pass
def distributed_sum(data, num_workers):
"""Sum using distributed memory (message passing)"""
def worker(conn, chunk):
"""Each worker sums its chunk and sends result"""
local_sum = sum(chunk)
conn.send(local_sum)
conn.close()
# Partition data
chunk_size = len(data) // num_workers
chunks = [data[i*chunk_size:(i+1)*chunk_size] for i in range(num_workers)]
# Create workers with communication pipes
processes = []
parent_conns = []
for chunk in chunks:
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn, chunk))
processes.append(p)
parent_conns.append(parent_conn)
p.start()
# Collect results (message passing)
total = 0
for conn in parent_conns:
total += conn.recv() # Receive message from worker
for p in processes:
p.join()
return total
Cache Coherence
In shared memory systems, each processor has a cache. Problem: what happens when one processor modifies cached data?
"""
Cache Coherence Problem:
CPU 0 Cache: x = 5
CPU 1 Cache: x = 5
Memory: x = 5
CPU 0 writes x = 10:
- CPU 0 Cache: x = 10
- CPU 1 Cache: x = 5 (stale!)
- Memory: x = 5 (stale!)
Without coherence protocol, CPU 1 sees wrong value!
"""
# MESI Protocol (common solution)
class CacheLine:
"""Simplified cache line with MESI states"""
# States
MODIFIED = 'M' # Only this cache has it, it's dirty
EXCLUSIVE = 'E' # Only this cache has it, it's clean
SHARED = 'S' # Multiple caches have it, all clean
INVALID = 'I' # Cache line is invalid
def __init__(self):
self.state = self.INVALID
self.data = None
def read(self):
"""Read request from local CPU"""
if self.state == self.INVALID:
# Must fetch from memory or other cache
# Transition to SHARED or EXCLUSIVE
pass
return self.data
def write(self, value):
"""Write request from local CPU"""
if self.state != self.MODIFIED:
# Must invalidate other caches
# Transition to MODIFIED
pass
self.data = value
self.state = self.MODIFIED
def snoop_read(self):
"""Another CPU is reading this address"""
if self.state == self.MODIFIED:
# Must provide data, transition to SHARED
self.state = self.SHARED
elif self.state == self.EXCLUSIVE:
self.state = self.SHARED
def snoop_write(self):
"""Another CPU is writing this address"""
# Must invalidate our copy
self.state = self.INVALID
False Sharing
Cache coherence overhead even for unrelated data:
import threading
import time
class FalseSharingDemo:
"""
False sharing: Different data on same cache line.
Cache lines are typically 64 bytes.
If two threads modify different variables on the same line,
they cause unnecessary invalidations.
"""
def __init__(self):
# These might be on the same cache line!
self.counter_a = 0
self.counter_b = 0
def increment_a(self, iterations):
for _ in range(iterations):
self.counter_a += 1
def increment_b(self, iterations):
for _ in range(iterations):
self.counter_b += 1
class PaddedCounters:
"""Avoid false sharing with padding"""
def __init__(self):
# Pad to separate cache lines (64 bytes each)
self.counter_a = 0
self._pad_a = [0] * 15 # 15 * 8 bytes = 120 bytes padding
self.counter_b = 0
self._pad_b = [0] * 15
def benchmark_false_sharing():
"""Compare with and without false sharing"""
iterations = 10_000_000
# With false sharing
demo = FalseSharingDemo()
start = time.time()
t1 = threading.Thread(target=demo.increment_a, args=(iterations,))
t2 = threading.Thread(target=demo.increment_b, args=(iterations,))
t1.start(); t2.start()
t1.join(); t2.join()
time_shared = time.time() - start
# With padding (reduced false sharing)
padded = PaddedCounters()
start = time.time()
t1 = threading.Thread(target=lambda: setattr(padded, 'counter_a', padded.counter_a + 1) or None, args=())
# ... similar benchmark
print(f"With false sharing: {time_shared:.3f}s")
5. Parallelization Patterns
Embarrassingly Parallel
No communication needed between tasks.
from concurrent.futures import ProcessPoolExecutor
import math
def is_prime(n):
"""Check if n is prime"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def find_primes_parallel(start, end, num_workers=4):
"""
Embarrassingly parallel: Each number checked independently.
No communication between tasks needed.
"""
numbers = range(start, end)
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = executor.map(is_prime, numbers)
return [n for n, is_p in zip(numbers, results) if is_p]
Map-Reduce Pattern
from functools import reduce
from concurrent.futures import ProcessPoolExecutor
def parallel_map_reduce(data, map_fn, reduce_fn, num_workers=4):
"""
Map-Reduce:
1. Map: Apply function to each element (parallel)
2. Reduce: Combine results (often sequential or tree-based)
"""
# Parallel map
with ProcessPoolExecutor(max_workers=num_workers) as executor:
mapped = list(executor.map(map_fn, data))
# Sequential reduce
return reduce(reduce_fn, mapped)
# Example: Word count
def count_words_in_chunk(text_chunk):
"""Map: Count words in one chunk"""
words = text_chunk.lower().split()
counts = {}
for word in words:
counts[word] = counts.get(word, 0) + 1
return counts
def merge_counts(counts1, counts2):
"""Reduce: Merge two count dictionaries"""
result = counts1.copy()
for word, count in counts2.items():
result[word] = result.get(word, 0) + count
return result
# Usage
text_chunks = ["hello world hello", "world foo bar", "hello bar baz"]
word_counts = parallel_map_reduce(text_chunks, count_words_in_chunk, merge_counts)
Fork-Join Pattern
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Callable, TypeVar
T = TypeVar('T')
class ForkJoinPool:
"""
Fork-Join: Divide problem, solve parts in parallel, combine.
"""
def __init__(self, num_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=num_workers)
def invoke(self, task: 'RecursiveTask[T]') -> T:
"""Execute a fork-join task"""
return task.compute(self)
def shutdown(self):
self.executor.shutdown()
class RecursiveTask:
"""Base class for fork-join tasks"""
def compute(self, pool: ForkJoinPool):
raise NotImplementedError
def fork(self, pool: ForkJoinPool) -> Future:
"""Submit task for async execution"""
return pool.executor.submit(self.compute, pool)
class ParallelMergeSort(RecursiveTask):
"""Merge sort using fork-join"""
THRESHOLD = 1000 # Switch to sequential below this size
def __init__(self, arr: List[int]):
self.arr = arr
def compute(self, pool: ForkJoinPool) -> List[int]:
if len(self.arr) <= self.THRESHOLD:
return sorted(self.arr) # Sequential sort
mid = len(self.arr) // 2
# Fork: Create subtasks
left_task = ParallelMergeSort(self.arr[:mid])
right_task = ParallelMergeSort(self.arr[mid:])
# Execute right in parallel, left here
right_future = right_task.fork(pool)
left_result = left_task.compute(pool)
# Join: Wait for results
right_result = right_future.result()
# Combine
return self._merge(left_result, right_result)
def _merge(self, left: List[int], right: List[int]) -> List[int]:
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
Exercises
Basic
Calculate the maximum speedup for a program that is 95% parallelizable using 64 cores.
Classify the following as SIMD or MIMD:
- GPU rendering
- Web server handling requests
- Matrix multiplication on CPU with AVX
Explain why false sharing causes performance problems.
Intermediate
Implement a parallel sum using both shared memory and message passing approaches.
Measure the scalability of a parallel program and plot speedup vs. number of cores.
Identify the parallelizable portions of a given algorithm and estimate speedup.
Advanced
Implement a work-stealing scheduler for load balancing.
Design a cache-friendly parallel algorithm that minimizes false sharing.
Analyze the cache coherence traffic for a given parallel program.
Summary
- Parallel computing is essential as single-core scaling has ended
- Amdahl's Law limits speedup based on sequential fraction
- Flynn's taxonomy: SISD, SIMD, MISD, MIMD
- Shared memory is easier but requires synchronization
- Distributed memory scales better but needs explicit communication
- Cache coherence maintains consistency but has overhead