Parallel Computing Fundamentals

Introduction

Parallel computing uses multiple processing elements simultaneously to solve problems faster. As single-core performance gains have slowed, parallelism has become essential for achieving performance. This reading covers parallel architectures, theoretical limits, and foundational concepts.

Learning Objectives

By the end of this reading, you will be able to:

  • Understand different types of parallelism
  • Apply Amdahl's Law and Gustafson's Law
  • Classify parallel architectures (Flynn's taxonomy)
  • Identify parallelization opportunities
  • Understand memory models and cache coherence

1. Why Parallel Computing?

The End of Free Lunch

From 1970-2005, single-core performance doubled every 18 months (Moore's Law + Dennard Scaling). Since ~2005:

  • Dennard Scaling ended (power wall)
  • Clock speeds plateaued (~3-4 GHz)
  • Transistor counts still grow, but as more cores
# Historical context
# 1990: 25 MHz single core
# 2000: 1 GHz single core
# 2005: 3 GHz single core (plateau)
# 2024: 3.5 GHz but 16+ cores

# To use modern hardware efficiently, we must parallelize

Types of Parallelism

1. Instruction-Level Parallelism (ILP)

  • CPU executes multiple instructions per cycle
  • Pipelining, superscalar, out-of-order execution
  • Handled by hardware automatically

2. Data Parallelism

  • Same operation on multiple data elements
  • SIMD (Single Instruction, Multiple Data)
  • GPUs excel at this

3. Task Parallelism

  • Different operations on different data
  • Multiple independent tasks
  • Thread pools, work stealing
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp

# Data parallelism example (conceptually)
def data_parallel_add(a: list, b: list) -> list:
    """Add two arrays element-wise - naturally parallel"""
    # Each element can be computed independently
    return [x + y for x, y in zip(a, b)]
    # With NumPy (uses SIMD internally):
    # return np.array(a) + np.array(b)

# Task parallelism example
def task_parallel_pipeline(data):
    """Different tasks on data - can run concurrently"""
    with ThreadPoolExecutor(max_workers=3) as executor:
        # Three different tasks running in parallel
        future1 = executor.submit(compute_statistics, data)
        future2 = executor.submit(generate_report, data)
        future3 = executor.submit(backup_data, data)

        return future1.result(), future2.result(), future3.result()

2. Theoretical Limits

Amdahl's Law

The speedup from parallelization is limited by the sequential portion.

def amdahls_law(p: float, n: int) -> float:
    """
    Calculate maximum speedup.

    Args:
        p: Fraction of program that can be parallelized (0 to 1)
        n: Number of processors

    Returns:
        Maximum speedup factor
    """
    # Speedup = 1 / ((1-p) + p/n)
    sequential_part = 1 - p
    parallel_part = p / n
    return 1 / (sequential_part + parallel_part)

# Examples
print(f"90% parallel, 4 cores: {amdahls_law(0.9, 4):.2f}x")   # 3.08x
print(f"90% parallel, 16 cores: {amdahls_law(0.9, 16):.2f}x") # 6.40x
print(f"90% parallel, ∞ cores: {amdahls_law(0.9, float('inf')):.2f}x")  # 10x max!

print(f"99% parallel, 100 cores: {amdahls_law(0.99, 100):.2f}x")  # 50.25x
print(f"99% parallel, ∞ cores: {amdahls_law(0.99, float('inf')):.2f}x")  # 100x max

# Key insight: Even 1% sequential limits speedup to 100x regardless of cores!

Gustafson's Law

As problem size grows, parallel portion often grows too.

def gustafsons_law(p: float, n: int) -> float:
    """
    Calculate scaled speedup.

    Instead of fixed problem size, assume we scale the problem
    to use available processors.

    Args:
        p: Fraction of parallel work (with n processors)
        n: Number of processors

    Returns:
        Scaled speedup
    """
    # Speedup = n - (1-p) * (n-1)
    # Or equivalently: (1-p) + p*n
    return (1 - p) + p * n

# With Gustafson's view, more cores = bigger problems
print(f"90% parallel, 16 cores: {gustafsons_law(0.9, 16):.1f}x")  # 14.5x
print(f"90% parallel, 100 cores: {gustafsons_law(0.9, 100):.1f}x")  # 90.1x

# More optimistic for scaling large problems

Efficiency and Scalability

def parallel_efficiency(speedup: float, num_processors: int) -> float:
    """
    Calculate parallel efficiency.

    Efficiency = Speedup / Number of Processors
    Perfect efficiency = 1.0 (100%)
    """
    return speedup / num_processors

def analyze_scalability(serial_time: float, parallel_times: dict):
    """Analyze scalability of a parallel program"""
    print(f"Serial time: {serial_time:.3f}s\n")
    print(f"{'Cores':<8} {'Time':<10} {'Speedup':<10} {'Efficiency':<10}")
    print("-" * 38)

    for cores, time in sorted(parallel_times.items()):
        speedup = serial_time / time
        efficiency = parallel_efficiency(speedup, cores)
        print(f"{cores:<8} {time:<10.3f} {speedup:<10.2f} {efficiency:<10.2%}")

# Example data
analyze_scalability(
    serial_time=10.0,
    parallel_times={
        1: 10.0,
        2: 5.2,
        4: 2.8,
        8: 1.6,
        16: 1.1,
        32: 0.9
    }
)
# Cores    Time       Speedup    Efficiency
# 1        10.000     1.00       100.00%
# 2        5.200      1.92       96.15%
# 4        2.800      3.57       89.29%
# 8        1.600      6.25       78.12%
# 16       1.100      9.09       56.82%
# 32       0.900      11.11      34.72%

3. Flynn's Taxonomy

Classification of computer architectures:

                    Single Data    Multiple Data
Single Instruction    SISD            SIMD
Multiple Instruction  MISD            MIMD

SISD (Single Instruction, Single Data)

Traditional sequential processor.

# SISD: One instruction, one data item at a time
def sisd_sum(arr):
    """Sequential sum - classic SISD"""
    total = 0
    for x in arr:  # One element at a time
        total += x  # One addition at a time
    return total

SIMD (Single Instruction, Multiple Data)

Same operation on multiple data simultaneously.

import numpy as np

# SIMD: One instruction operates on multiple data
def simd_sum_concept(arr):
    """
    SIMD processes multiple elements with one instruction.
    NumPy uses SIMD internally via vectorization.
    """
    # Conceptually, instead of:
    # for i in range(len(arr)): result[i] = arr[i] * 2

    # SIMD does (in hardware):
    # Load 4 values into vector register
    # Multiply all 4 by 2 in single instruction
    # Store 4 results

    return np.array(arr) * 2  # Vectorized operation

# Performance comparison
import time

def benchmark_simd():
    size = 10_000_000
    arr_list = list(range(size))
    arr_numpy = np.arange(size)

    # Sequential (Python loop)
    start = time.time()
    result1 = [x * 2 for x in arr_list]
    seq_time = time.time() - start

    # Vectorized (NumPy/SIMD)
    start = time.time()
    result2 = arr_numpy * 2
    vec_time = time.time() - start

    print(f"Sequential: {seq_time:.3f}s")
    print(f"Vectorized: {vec_time:.3f}s")
    print(f"Speedup: {seq_time/vec_time:.1f}x")

MIMD (Multiple Instruction, Multiple Data)

Different processors execute different instructions on different data.

from multiprocessing import Process, Queue
import os

# MIMD: Different processors, different tasks
def task_a(q, data):
    """Processor 1: Compute sum"""
    result = sum(data)
    q.put(('sum', result))

def task_b(q, data):
    """Processor 2: Compute product"""
    result = 1
    for x in data:
        result *= x
    q.put(('product', result))

def task_c(q, data):
    """Processor 3: Find max"""
    result = max(data)
    q.put(('max', result))

def mimd_example():
    """Run different tasks on different processors"""
    data = [1, 2, 3, 4, 5]
    q = Queue()

    # Each process runs different code (Multiple Instruction)
    # on potentially different data (Multiple Data)
    processes = [
        Process(target=task_a, args=(q, data)),
        Process(target=task_b, args=(q, data)),
        Process(target=task_c, args=(q, data))
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    results = {}
    while not q.empty():
        name, value = q.get()
        results[name] = value

    return results

4. Memory Models

Shared Memory

All processors access the same memory space.

import threading

class SharedMemoryModel:
    """
    Shared memory: All threads see same data.

    Advantages:
    - Easy to program (just share variables)
    - Low latency for small data

    Disadvantages:
    - Need synchronization (locks, etc.)
    - Doesn't scale to many processors
    - Cache coherence overhead
    """

    def __init__(self):
        self.shared_data = {}
        self.lock = threading.Lock()

    def write(self, key, value):
        with self.lock:  # Need synchronization!
            self.shared_data[key] = value

    def read(self, key):
        with self.lock:
            return self.shared_data.get(key)

# Example: Shared counter (problematic without sync)
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        # Race condition: read-modify-write is not atomic
        # Thread 1: reads 5
        # Thread 2: reads 5
        # Thread 1: writes 6
        # Thread 2: writes 6 (lost update!)
        self.count += 1

def demonstrate_race_condition():
    """Show why shared memory needs synchronization"""
    counter = Counter()
    threads = []

    def increment_many():
        for _ in range(100000):
            counter.increment()

    for _ in range(4):
        t = threading.Thread(target=increment_many)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"Expected: 400000, Got: {counter.count}")
    # Will likely be less than 400000 due to race conditions!

Distributed Memory

Each processor has its own memory; communicate via messages.

from multiprocessing import Process, Pipe

class DistributedMemoryModel:
    """
    Distributed memory: Each process has private memory.
    Communication via message passing.

    Advantages:
    - Scales to many processors
    - No cache coherence issues
    - Can span multiple machines

    Disadvantages:
    - Explicit communication (more programming effort)
    - Higher latency for communication
    - Need to partition data
    """
    pass

def distributed_sum(data, num_workers):
    """Sum using distributed memory (message passing)"""

    def worker(conn, chunk):
        """Each worker sums its chunk and sends result"""
        local_sum = sum(chunk)
        conn.send(local_sum)
        conn.close()

    # Partition data
    chunk_size = len(data) // num_workers
    chunks = [data[i*chunk_size:(i+1)*chunk_size] for i in range(num_workers)]

    # Create workers with communication pipes
    processes = []
    parent_conns = []

    for chunk in chunks:
        parent_conn, child_conn = Pipe()
        p = Process(target=worker, args=(child_conn, chunk))
        processes.append(p)
        parent_conns.append(parent_conn)
        p.start()

    # Collect results (message passing)
    total = 0
    for conn in parent_conns:
        total += conn.recv()  # Receive message from worker

    for p in processes:
        p.join()

    return total

Cache Coherence

In shared memory systems, each processor has a cache. Problem: what happens when one processor modifies cached data?

"""
Cache Coherence Problem:

CPU 0 Cache: x = 5
CPU 1 Cache: x = 5
Memory:      x = 5

CPU 0 writes x = 10:
- CPU 0 Cache: x = 10
- CPU 1 Cache: x = 5  (stale!)
- Memory:      x = 5  (stale!)

Without coherence protocol, CPU 1 sees wrong value!
"""

# MESI Protocol (common solution)
class CacheLine:
    """Simplified cache line with MESI states"""

    # States
    MODIFIED = 'M'   # Only this cache has it, it's dirty
    EXCLUSIVE = 'E'  # Only this cache has it, it's clean
    SHARED = 'S'     # Multiple caches have it, all clean
    INVALID = 'I'    # Cache line is invalid

    def __init__(self):
        self.state = self.INVALID
        self.data = None

    def read(self):
        """Read request from local CPU"""
        if self.state == self.INVALID:
            # Must fetch from memory or other cache
            # Transition to SHARED or EXCLUSIVE
            pass
        return self.data

    def write(self, value):
        """Write request from local CPU"""
        if self.state != self.MODIFIED:
            # Must invalidate other caches
            # Transition to MODIFIED
            pass
        self.data = value
        self.state = self.MODIFIED

    def snoop_read(self):
        """Another CPU is reading this address"""
        if self.state == self.MODIFIED:
            # Must provide data, transition to SHARED
            self.state = self.SHARED
        elif self.state == self.EXCLUSIVE:
            self.state = self.SHARED

    def snoop_write(self):
        """Another CPU is writing this address"""
        # Must invalidate our copy
        self.state = self.INVALID

False Sharing

Cache coherence overhead even for unrelated data:

import threading
import time

class FalseSharingDemo:
    """
    False sharing: Different data on same cache line.

    Cache lines are typically 64 bytes.
    If two threads modify different variables on the same line,
    they cause unnecessary invalidations.
    """

    def __init__(self):
        # These might be on the same cache line!
        self.counter_a = 0
        self.counter_b = 0

    def increment_a(self, iterations):
        for _ in range(iterations):
            self.counter_a += 1

    def increment_b(self, iterations):
        for _ in range(iterations):
            self.counter_b += 1

class PaddedCounters:
    """Avoid false sharing with padding"""

    def __init__(self):
        # Pad to separate cache lines (64 bytes each)
        self.counter_a = 0
        self._pad_a = [0] * 15  # 15 * 8 bytes = 120 bytes padding
        self.counter_b = 0
        self._pad_b = [0] * 15

def benchmark_false_sharing():
    """Compare with and without false sharing"""
    iterations = 10_000_000

    # With false sharing
    demo = FalseSharingDemo()
    start = time.time()
    t1 = threading.Thread(target=demo.increment_a, args=(iterations,))
    t2 = threading.Thread(target=demo.increment_b, args=(iterations,))
    t1.start(); t2.start()
    t1.join(); t2.join()
    time_shared = time.time() - start

    # With padding (reduced false sharing)
    padded = PaddedCounters()
    start = time.time()
    t1 = threading.Thread(target=lambda: setattr(padded, 'counter_a', padded.counter_a + 1) or None, args=())
    # ... similar benchmark

    print(f"With false sharing: {time_shared:.3f}s")

5. Parallelization Patterns

Embarrassingly Parallel

No communication needed between tasks.

from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n):
    """Check if n is prime"""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def find_primes_parallel(start, end, num_workers=4):
    """
    Embarrassingly parallel: Each number checked independently.
    No communication between tasks needed.
    """
    numbers = range(start, end)

    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = executor.map(is_prime, numbers)

    return [n for n, is_p in zip(numbers, results) if is_p]

Map-Reduce Pattern

from functools import reduce
from concurrent.futures import ProcessPoolExecutor

def parallel_map_reduce(data, map_fn, reduce_fn, num_workers=4):
    """
    Map-Reduce:
    1. Map: Apply function to each element (parallel)
    2. Reduce: Combine results (often sequential or tree-based)
    """
    # Parallel map
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        mapped = list(executor.map(map_fn, data))

    # Sequential reduce
    return reduce(reduce_fn, mapped)

# Example: Word count
def count_words_in_chunk(text_chunk):
    """Map: Count words in one chunk"""
    words = text_chunk.lower().split()
    counts = {}
    for word in words:
        counts[word] = counts.get(word, 0) + 1
    return counts

def merge_counts(counts1, counts2):
    """Reduce: Merge two count dictionaries"""
    result = counts1.copy()
    for word, count in counts2.items():
        result[word] = result.get(word, 0) + count
    return result

# Usage
text_chunks = ["hello world hello", "world foo bar", "hello bar baz"]
word_counts = parallel_map_reduce(text_chunks, count_words_in_chunk, merge_counts)

Fork-Join Pattern

from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Callable, TypeVar

T = TypeVar('T')

class ForkJoinPool:
    """
    Fork-Join: Divide problem, solve parts in parallel, combine.
    """

    def __init__(self, num_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=num_workers)

    def invoke(self, task: 'RecursiveTask[T]') -> T:
        """Execute a fork-join task"""
        return task.compute(self)

    def shutdown(self):
        self.executor.shutdown()

class RecursiveTask:
    """Base class for fork-join tasks"""

    def compute(self, pool: ForkJoinPool):
        raise NotImplementedError

    def fork(self, pool: ForkJoinPool) -> Future:
        """Submit task for async execution"""
        return pool.executor.submit(self.compute, pool)

class ParallelMergeSort(RecursiveTask):
    """Merge sort using fork-join"""

    THRESHOLD = 1000  # Switch to sequential below this size

    def __init__(self, arr: List[int]):
        self.arr = arr

    def compute(self, pool: ForkJoinPool) -> List[int]:
        if len(self.arr) <= self.THRESHOLD:
            return sorted(self.arr)  # Sequential sort

        mid = len(self.arr) // 2

        # Fork: Create subtasks
        left_task = ParallelMergeSort(self.arr[:mid])
        right_task = ParallelMergeSort(self.arr[mid:])

        # Execute right in parallel, left here
        right_future = right_task.fork(pool)
        left_result = left_task.compute(pool)

        # Join: Wait for results
        right_result = right_future.result()

        # Combine
        return self._merge(left_result, right_result)

    def _merge(self, left: List[int], right: List[int]) -> List[int]:
        result = []
        i = j = 0
        while i < len(left) and j < len(right):
            if left[i] <= right[j]:
                result.append(left[i])
                i += 1
            else:
                result.append(right[j])
                j += 1
        result.extend(left[i:])
        result.extend(right[j:])
        return result

Exercises

Basic

  1. Calculate the maximum speedup for a program that is 95% parallelizable using 64 cores.

  2. Classify the following as SIMD or MIMD:

    • GPU rendering
    • Web server handling requests
    • Matrix multiplication on CPU with AVX
  3. Explain why false sharing causes performance problems.

Intermediate

  1. Implement a parallel sum using both shared memory and message passing approaches.

  2. Measure the scalability of a parallel program and plot speedup vs. number of cores.

  3. Identify the parallelizable portions of a given algorithm and estimate speedup.

Advanced

  1. Implement a work-stealing scheduler for load balancing.

  2. Design a cache-friendly parallel algorithm that minimizes false sharing.

  3. Analyze the cache coherence traffic for a given parallel program.


Summary

  • Parallel computing is essential as single-core scaling has ended
  • Amdahl's Law limits speedup based on sequential fraction
  • Flynn's taxonomy: SISD, SIMD, MISD, MIMD
  • Shared memory is easier but requires synchronization
  • Distributed memory scales better but needs explicit communication
  • Cache coherence maintains consistency but has overhead

Next Reading

Threading and Synchronization →