Parallel Computing Fundamentals

Parallel computing uses multiple processing elements simultaneously to solve problems faster. As single-core performance gains have slowed, parallelism has become essential for achieving performance. ## 1. Why Parallel Computing?

The End of Free Lunch

From 1970-2005, single-core performance doubled every 18 months (Moore's Law + Dennard Scaling). Since ~2005:

  • Dennard Scaling ended (power wall)
  • Clock speeds plateaued (~3-4 GHz)
  • Transistor counts still grow, but as more cores
# Historical context
# 1990: 25 MHz single core
# 2000: 1 GHz single core
# 2005: 3 GHz single core (plateau)
# 2024: 3.5 GHz but 16+ cores

# To use modern hardware efficiently, we must parallelize

Types of Parallelism

1. Instruction-Level Parallelism (ILP)

  • CPU executes multiple instructions per cycle
  • Pipelining, superscalar, out-of-order execution
  • Handled by hardware automatically

2. Data Parallelism

  • Same operation on multiple data elements
  • SIMD (Single Instruction, Multiple Data)
  • GPUs excel at this

3. Task Parallelism

  • Different operations on different data
  • Multiple independent tasks
  • Thread pools, work stealing
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp

# Data parallelism example (conceptually)
def data_parallel_add(a: list, b: list) -> list:
    """Add two arrays element-wise - naturally parallel"""
    # Each element can be computed independently
    return [x + y for x, y in zip(a, b)]
    # With NumPy (uses SIMD internally):
    # return np.array(a) + np.array(b)

# Task parallelism example
def task_parallel_pipeline(data):
    """Different tasks on data - can run concurrently"""
    with ThreadPoolExecutor(max_workers=3) as executor:
        # Three different tasks running in parallel
        future1 = executor.submit(compute_statistics, data)
        future2 = executor.submit(generate_report, data)
        future3 = executor.submit(backup_data, data)

        return future1.result(), future2.result(), future3.result()

2. Theoretical Limits

Amdahl's Law

The speedup from parallelization is limited by the sequential portion.

def amdahls_law(p: float, n: int) -> float:
    """
    Calculate maximum speedup.

    Args:
        p: Fraction of program that can be parallelized (0 to 1)
        n: Number of processors

    Returns:
        Maximum speedup factor
    """
    # Speedup = 1 / ((1-p) + p/n)
    sequential_part = 1 - p
    parallel_part = p / n
    return 1 / (sequential_part + parallel_part)

# Examples
print(f"90% parallel, 4 cores: {amdahls_law(0.9, 4):.2f}x")   # 3.08x
print(f"90% parallel, 16 cores: {amdahls_law(0.9, 16):.2f}x") # 6.40x
print(f"90% parallel, ∞ cores: {amdahls_law(0.9, float('inf')):.2f}x")  # 10x max!

print(f"99% parallel, 100 cores: {amdahls_law(0.99, 100):.2f}x")  # 50.25x
print(f"99% parallel, ∞ cores: {amdahls_law(0.99, float('inf')):.2f}x")  # 100x max

# Key insight: Even 1% sequential limits speedup to 100x regardless of cores!

Gustafson's Law

As problem size grows, parallel portion often grows too.

def gustafsons_law(p: float, n: int) -> float:
    """
    Calculate scaled speedup.

    Instead of fixed problem size, assume we scale the problem
    to use available processors.

    Args:
        p: Fraction of parallel work (with n processors)
        n: Number of processors

    Returns:
        Scaled speedup
    """
    # Speedup = n - (1-p) * (n-1)
    # Or equivalently: (1-p) + p*n
    return (1 - p) + p * n

# With Gustafson's view, more cores = bigger problems
print(f"90% parallel, 16 cores: {gustafsons_law(0.9, 16):.1f}x")  # 14.5x
print(f"90% parallel, 100 cores: {gustafsons_law(0.9, 100):.1f}x")  # 90.1x

# More optimistic for scaling large problems

Efficiency and Scalability

def parallel_efficiency(speedup: float, num_processors: int) -> float:
    """
    Calculate parallel efficiency.

    Efficiency = Speedup / Number of Processors
    Perfect efficiency = 1.0 (100%)
    """
    return speedup / num_processors

def analyze_scalability(serial_time: float, parallel_times: dict):
    """Analyze scalability of a parallel program"""
    print(f"Serial time: {serial_time:.3f}s\n")
    print(f"{'Cores':<8} {'Time':<10} {'Speedup':<10} {'Efficiency':<10}")
    print("-" * 38)

    for cores, time in sorted(parallel_times.items()):
        speedup = serial_time / time
        efficiency = parallel_efficiency(speedup, cores)
        print(f"{cores:<8} {time:<10.3f} {speedup:<10.2f} {efficiency:<10.2%}")

# Example data
analyze_scalability(
    serial_time=10.0,
    parallel_times={
        1: 10.0,
        2: 5.2,
        4: 2.8,
        8: 1.6,
        16: 1.1,
        32: 0.9
    }
)
# Cores    Time       Speedup    Efficiency
# 1        10.000     1.00       100.00%
# 2        5.200      1.92       96.15%
# 4        2.800      3.57       89.29%
# 8        1.600      6.25       78.12%
# 16       1.100      9.09       56.82%
# 32       0.900      11.11      34.72%

3. Flynn's Taxonomy

Classification of computer architectures:

                    Single Data    Multiple Data
Single Instruction    SISD            SIMD
Multiple Instruction  MISD            MIMD

SISD (Single Instruction, Single Data)

Traditional sequential processor.

# SISD: One instruction, one data item at a time
def sisd_sum(arr):
    """Sequential sum - classic SISD"""
    total = 0
    for x in arr:  # One element at a time
        total += x  # One addition at a time
    return total

SIMD (Single Instruction, Multiple Data)

Same operation on multiple data simultaneously.

import numpy as np

# SIMD: One instruction operates on multiple data
def simd_sum_concept(arr):
    """
    SIMD processes multiple elements with one instruction.
    NumPy uses SIMD internally via vectorization.
    """
    # Conceptually, instead of:
    # for i in range(len(arr)): result[i] = arr[i] * 2

    # SIMD does (in hardware):
    # Load 4 values into vector register
    # Multiply all 4 by 2 in single instruction
    # Store 4 results

    return np.array(arr) * 2  # Vectorized operation

# Performance comparison
import time

def benchmark_simd():
    size = 10_000_000
    arr_list = list(range(size))
    arr_numpy = np.arange(size)

    # Sequential (Python loop)
    start = time.time()
    result1 = [x * 2 for x in arr_list]
    seq_time = time.time() - start

    # Vectorized (NumPy/SIMD)
    start = time.time()
    result2 = arr_numpy * 2
    vec_time = time.time() - start

    print(f"Sequential: {seq_time:.3f}s")
    print(f"Vectorized: {vec_time:.3f}s")
    print(f"Speedup: {seq_time/vec_time:.1f}x")

MIMD (Multiple Instruction, Multiple Data)

Different processors execute different instructions on different data.

from multiprocessing import Process, Queue
import os

# MIMD: Different processors, different tasks
def task_a(q, data):
    """Processor 1: Compute sum"""
    result = sum(data)
    q.put(('sum', result))

def task_b(q, data):
    """Processor 2: Compute product"""
    result = 1
    for x in data:
        result *= x
    q.put(('product', result))

def task_c(q, data):
    """Processor 3: Find max"""
    result = max(data)
    q.put(('max', result))

def mimd_example():
    """Run different tasks on different processors"""
    data = [1, 2, 3, 4, 5]
    q = Queue()

    # Each process runs different code (Multiple Instruction)
    # on potentially different data (Multiple Data)
    processes = [
        Process(target=task_a, args=(q, data)),
        Process(target=task_b, args=(q, data)),
        Process(target=task_c, args=(q, data))
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    results = {}
    while not q.empty():
        name, value = q.get()
        results[name] = value

    return results

4. Memory Models

Shared Memory

All processors access the same memory space.

import threading

class SharedMemoryModel:
    """
    Shared memory: All threads see same data.

    Advantages:
    - Easy to program (just share variables)
    - Low latency for small data

    Disadvantages:
    - Need synchronization (locks, etc.)
    - Doesn't scale to many processors
    - Cache coherence overhead
    """

    def __init__(self):
        self.shared_data = {}
        self.lock = threading.Lock()

    def write(self, key, value):
        with self.lock:  # Need synchronization!
            self.shared_data[key] = value

    def read(self, key):
        with self.lock:
            return self.shared_data.get(key)

# Example: Shared counter (problematic without sync)
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        # Race condition: read-modify-write is not atomic
        # Thread 1: reads 5
        # Thread 2: reads 5
        # Thread 1: writes 6
        # Thread 2: writes 6 (lost update!)
        self.count += 1

def demonstrate_race_condition():
    """Show why shared memory needs synchronization"""
    counter = Counter()
    threads = []

    def increment_many():
        for _ in range(100000):
            counter.increment()

    for _ in range(4):
        t = threading.Thread(target=increment_many)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"Expected: 400000, Got: {counter.count}")
    # Will likely be less than 400000 due to race conditions!

Distributed Memory

Each processor has its own memory; communicate via messages.

from multiprocessing import Process, Pipe

class DistributedMemoryModel:
    """
    Distributed memory: Each process has private memory.
    Communication via message passing.

    Advantages:
    - Scales to many processors
    - No cache coherence issues
    - Can span multiple machines

    Disadvantages:
    - Explicit communication (more programming effort)
    - Higher latency for communication
    - Need to partition data
    """
    pass

def distributed_sum(data, num_workers):
    """Sum using distributed memory (message passing)"""

    def worker(conn, chunk):
        """Each worker sums its chunk and sends result"""
        local_sum = sum(chunk)
        conn.send(local_sum)
        conn.close()

    # Partition data
    chunk_size = len(data) // num_workers
    chunks = [data[i*chunk_size:(i+1)*chunk_size] for i in range(num_workers)]

    # Create workers with communication pipes
    processes = []
    parent_conns = []

    for chunk in chunks:
        parent_conn, child_conn = Pipe()
        p = Process(target=worker, args=(child_conn, chunk))
        processes.append(p)
        parent_conns.append(parent_conn)
        p.start()

    # Collect results (message passing)
    total = 0
    for conn in parent_conns:
        total += conn.recv()  # Receive message from worker

    for p in processes:
        p.join()

    return total

Cache Coherence

In shared memory systems, each processor has a cache. Problem: what happens when one processor modifies cached data?

"""
Cache Coherence Problem:

CPU 0 Cache: x = 5
CPU 1 Cache: x = 5
Memory:      x = 5

CPU 0 writes x = 10:
- CPU 0 Cache: x = 10
- CPU 1 Cache: x = 5  (stale!)
- Memory:      x = 5  (stale!)

Without coherence protocol, CPU 1 sees wrong value!
"""

# MESI Protocol (common solution)
class CacheLine:
    """Simplified cache line with MESI states"""

    # States
    MODIFIED = 'M'   # Only this cache has it, it's dirty
    EXCLUSIVE = 'E'  # Only this cache has it, it's clean
    SHARED = 'S'     # Multiple caches have it, all clean
    INVALID = 'I'    # Cache line is invalid

    def __init__(self):
        self.state = self.INVALID
        self.data = None

    def read(self):
        """Read request from local CPU"""
        if self.state == self.INVALID:
            # Must fetch from memory or other cache
            # Transition to SHARED or EXCLUSIVE
            pass
        return self.data

    def write(self, value):
        """Write request from local CPU"""
        if self.state != self.MODIFIED:
            # Must invalidate other caches
            # Transition to MODIFIED
            pass
        self.data = value
        self.state = self.MODIFIED

    def snoop_read(self):
        """Another CPU is reading this address"""
        if self.state == self.MODIFIED:
            # Must provide data, transition to SHARED
            self.state = self.SHARED
        elif self.state == self.EXCLUSIVE:
            self.state = self.SHARED

    def snoop_write(self):
        """Another CPU is writing this address"""
        # Must invalidate our copy
        self.state = self.INVALID

False Sharing

Cache coherence overhead even for unrelated data:

import threading
import time

class FalseSharingDemo:
    """
    False sharing: Different data on same cache line.

    Cache lines are typically 64 bytes.
    If two threads modify different variables on the same line,
    they cause unnecessary invalidations.
    """

    def __init__(self):
        # These might be on the same cache line!
        self.counter_a = 0
        self.counter_b = 0

    def increment_a(self, iterations):
        for _ in range(iterations):
            self.counter_a += 1

    def increment_b(self, iterations):
        for _ in range(iterations):
            self.counter_b += 1

class PaddedCounters:
    """Avoid false sharing with padding"""

    def __init__(self):
        # Pad to separate cache lines (64 bytes each)
        self.counter_a = 0
        self._pad_a = [0] * 15  # 15 * 8 bytes = 120 bytes padding
        self.counter_b = 0
        self._pad_b = [0] * 15

def benchmark_false_sharing():
    """Compare with and without false sharing"""
    iterations = 10_000_000

    # With false sharing
    demo = FalseSharingDemo()
    start = time.time()
    t1 = threading.Thread(target=demo.increment_a, args=(iterations,))
    t2 = threading.Thread(target=demo.increment_b, args=(iterations,))
    t1.start(); t2.start()
    t1.join(); t2.join()
    time_shared = time.time() - start

    # With padding (reduced false sharing)
    padded = PaddedCounters()
    start = time.time()
    t1 = threading.Thread(target=lambda: setattr(padded, 'counter_a', padded.counter_a + 1) or None, args=())
    # ... similar benchmark

    print(f"With false sharing: {time_shared:.3f}s")

5. Parallelization Patterns

Embarrassingly Parallel

No communication needed between tasks.

from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n):
    """Check if n is prime"""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def find_primes_parallel(start, end, num_workers=4):
    """
    Embarrassingly parallel: Each number checked independently.
    No communication between tasks needed.
    """
    numbers = range(start, end)

    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = executor.map(is_prime, numbers)

    return [n for n, is_p in zip(numbers, results) if is_p]

Map-Reduce Pattern

from functools import reduce
from concurrent.futures import ProcessPoolExecutor

def parallel_map_reduce(data, map_fn, reduce_fn, num_workers=4):
    """
    Map-Reduce:
    1. Map: Apply function to each element (parallel)
    2. Reduce: Combine results (often sequential or tree-based)
    """
    # Parallel map
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_fn, data))

    # Sequential reduce
    return reduce(reduce_fn, mapped)

# Example: Word count
def count_words_in_chunk(text_chunk):
    """Map: Count words in one chunk"""
    words = text_chunk.lower().split()
    counts = {}
    for word in words:
        counts[word] = counts.get(word, 0) + 1
    return counts

def merge_counts(counts1, counts2):
    """Reduce: Merge two count dictionaries"""
    result = counts1.copy()
    for word, count in counts2.items():
        result[word] = result.get(word, 0) + count
    return result

# Usage
text_chunks = ["hello world hello", "world foo bar", "hello bar baz"]
word_counts = parallel_map_reduce(text_chunks, count_words_in_chunk, merge_counts)

Fork-Join Pattern

from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Callable, TypeVar

T = TypeVar('T')

class ForkJoinPool:
    """
    Fork-Join: Divide problem, solve parts in parallel, combine.
    """

    def __init__(self, num_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=num_workers)

    def invoke(self, task: 'RecursiveTask[T]') -> T:
        """Execute a fork-join task"""
        return task.compute(self)

    def shutdown(self):
        self.executor.shutdown()

class RecursiveTask:
    """Base class for fork-join tasks"""

    def compute(self, pool: ForkJoinPool):
        raise NotImplementedError

    def fork(self, pool: ForkJoinPool) -> Future:
        """Submit task for async execution"""
        return pool.executor.submit(self.compute, pool)

class ParallelMergeSort(RecursiveTask):
    """Merge sort using fork-join"""

    THRESHOLD = 1000  # Switch to sequential below this size

    def __init__(self, arr: List[int]):
        self.arr = arr

    def compute(self, pool: ForkJoinPool) -> List[int]:
        if len(self.arr) <= self.THRESHOLD:
            return sorted(self.arr)  # Sequential sort

        mid = len(self.arr) // 2

        # Fork: Create subtasks
        left_task = ParallelMergeSort(self.arr[:mid])
        right_task = ParallelMergeSort(self.arr[mid:])

        # Execute right in parallel, left here
        right_future = right_task.fork(pool)
        left_result = left_task.compute(pool)

        # Join: Wait for results
        right_result = right_future.result()

        # Combine
        return self._merge(left_result, right_result)

    def _merge(self, left: List[int], right: List[int]) -> List[int]:
        result = []
        i = j = 0
        while i < len(left) and j < len(right):
            if left[i] <= right[j]:
                result.append(left[i])
                i += 1
            else:
                result.append(right[j])
                j += 1
        result.extend(left[i:])
        result.extend(right[j:])
        return result

Exercises

Basic

  1. Calculate the maximum speedup for a program that is 95% parallelizable using 64 cores.

  2. Classify the following as SIMD or MIMD:

    • GPU rendering
    • Web server handling requests
    • Matrix multiplication on CPU with AVX
  3. Explain why false sharing causes performance problems.

Intermediate

  1. Implement a parallel sum using both shared memory and message passing approaches.

  2. Measure the scalability of a parallel program and plot speedup vs. number of cores.

  3. Identify the parallelizable portions of a given algorithm and estimate speedup.

Advanced

  1. Implement a work-stealing scheduler for load balancing.

  2. Design a cache-friendly parallel algorithm that minimizes false sharing.

  3. Analyze the cache coherence traffic for a given parallel program.


Summary

  • Parallel computing is essential as single-core scaling has ended
  • Amdahl's Law limits speedup based on sequential fraction
  • Flynn's taxonomy: SISD, SIMD, MISD, MIMD
  • Shared memory is easier but requires synchronization
  • Distributed memory scales better but needs explicit communication
  • Cache coherence maintains consistency but has overhead

Next Reading

Threading and Synchronization →