Randomized Algorithms

Introduction

Randomized algorithms use random numbers as part of their logic. They often achieve better performance, simpler implementation, or solve problems that deterministic algorithms cannot efficiently solve. Understanding randomized algorithms requires probability theory and is essential for modern computer science.

Learning Objectives

By the end of this reading, you will be able to:

  • Distinguish Las Vegas and Monte Carlo algorithms
  • Analyze expected running time of randomized algorithms
  • Understand and implement randomized QuickSort
  • Apply randomization to data structures (skip lists, hash tables)
  • Use Monte Carlo methods for estimation
  • Understand probabilistic analysis of algorithms

1. Types of Randomized Algorithms

Las Vegas Algorithms

Always correct, but running time is random.

  • Output is always correct
  • Running time varies based on random choices
  • Example: Randomized QuickSort
import random

def las_vegas_example(arr, target):
    """
    Las Vegas search: randomly shuffle and search
    Always finds target if present, time varies
    """
    indices = list(range(len(arr)))
    random.shuffle(indices)

    for i in indices:
        if arr[i] == target:
            return i
    return -1

Monte Carlo Algorithms

Always fast, but may be incorrect.

  • Running time is bounded/predictable
  • Output may be incorrect with some probability
  • Can often reduce error probability by repetition
def monte_carlo_primality(n, k=10):
    """
    Miller-Rabin primality test (Monte Carlo)

    If n is prime: always returns True
    If n is composite: returns True with probability ≤ 4^(-k)
    """
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    # Write n-1 as 2^r * d
    r, d = 0, n - 1
    while d % 2 == 0:
        r += 1
        d //= 2

    def witness(a):
        """Test if a is a witness for compositeness"""
        x = pow(a, d, n)
        if x == 1 or x == n - 1:
            return False
        for _ in range(r - 1):
            x = pow(x, 2, n)
            if x == n - 1:
                return False
        return True

    for _ in range(k):
        a = random.randrange(2, n)
        if witness(a):
            return False

    return True  # Probably prime

2. Randomized QuickSort

Algorithm

def randomized_quicksort(arr):
    """
    QuickSort with random pivot selection

    Expected time: O(n log n)
    Worst case: O(n²) but very unlikely
    """
    if len(arr) <= 1:
        return arr

    # Random pivot selection
    pivot_idx = random.randrange(len(arr))
    pivot = arr[pivot_idx]

    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]

    return randomized_quicksort(left) + middle + randomized_quicksort(right)

def randomized_quicksort_inplace(arr, low=0, high=None):
    """In-place randomized QuickSort"""
    if high is None:
        high = len(arr) - 1

    if low < high:
        pivot_idx = randomized_partition(arr, low, high)
        randomized_quicksort_inplace(arr, low, pivot_idx - 1)
        randomized_quicksort_inplace(arr, pivot_idx + 1, high)

def randomized_partition(arr, low, high):
    """Partition with random pivot"""
    # Choose random pivot and swap to end
    pivot_idx = random.randint(low, high)
    arr[pivot_idx], arr[high] = arr[high], arr[pivot_idx]

    pivot = arr[high]
    i = low - 1

    for j in range(low, high):
        if arr[j] <= pivot:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]

    arr[i + 1], arr[high] = arr[high], arr[i + 1]
    return i + 1

Analysis

def analyze_quicksort():
    """
    Expected number of comparisons in randomized QuickSort

    Let Xij = 1 if elements ranked i and j are compared

    E[comparisons] = Σᵢ Σⱼ>ᵢ E[Xij] = Σᵢ Σⱼ>ᵢ P(i and j compared)

    Key insight: i and j are compared iff one is chosen as pivot
    before any element between them.

    P(i,j compared) = 2/(j-i+1)

    E[X] = Σᵢ Σⱼ>ᵢ 2/(j-i+1)
         = Σᵢ Σₖ₌₂ⁿ⁻ⁱ⁺¹ 2/k
         ≈ 2n × Hₙ
         ≈ 2n ln n
         = O(n log n)
    """
    import math

    def expected_comparisons(n):
        """Calculate expected comparisons"""
        total = 0
        for i in range(1, n + 1):
            for j in range(i + 1, n + 1):
                total += 2 / (j - i + 1)
        return total

    def harmonic(n):
        return sum(1/k for k in range(1, n + 1))

    for n in [10, 100, 1000]:
        exact = expected_comparisons(n)
        approx = 2 * n * harmonic(n)
        print(f"n={n}: exact={exact:.1f}, approx={approx:.1f}, 2n ln n={2*n*math.log(n):.1f}")

analyze_quicksort()

3. Randomized Selection

QuickSelect

Find the k-th smallest element in expected O(n) time.

def quickselect(arr, k):
    """
    Find k-th smallest element (0-indexed)

    Expected time: O(n)
    Worst case: O(n²)
    """
    if len(arr) == 1:
        return arr[0]

    pivot = random.choice(arr)

    lows = [x for x in arr if x < pivot]
    highs = [x for x in arr if x > pivot]
    pivots = [x for x in arr if x == pivot]

    if k < len(lows):
        return quickselect(lows, k)
    elif k < len(lows) + len(pivots):
        return pivot
    else:
        return quickselect(highs, k - len(lows) - len(pivots))

# Find median
arr = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
median = quickselect(arr.copy(), len(arr) // 2)
print(f"Median: {median}")

Analysis of QuickSelect

"""
Expected running time analysis:

Let T(n) = expected time on n elements

Good pivot: splits into at most 3n/4 elements on larger side
P(good pivot) ≥ 1/2

T(n) ≤ cn + (1/2)T(n) + (1/2)T(3n/4)
     ≤ cn + (1/2)T(n) + (1/2)T(3n/4)

Solving: T(n) = O(n)
"""

4. Skip Lists

A randomized data structure for ordered sets.

import random

class SkipListNode:
    def __init__(self, value, level):
        self.value = value
        self.forward = [None] * (level + 1)

class SkipList:
    """
    Probabilistic balanced search structure

    Expected search/insert/delete: O(log n)
    Space: O(n) expected
    """

    def __init__(self, max_level=16, p=0.5):
        self.max_level = max_level
        self.p = p
        self.level = 0
        self.header = SkipListNode(None, max_level)

    def random_level(self):
        """Generate random level with geometric distribution"""
        level = 0
        while random.random() < self.p and level < self.max_level:
            level += 1
        return level

    def search(self, value):
        """Search for value, return node or None"""
        current = self.header

        # Start from highest level
        for i in range(self.level, -1, -1):
            while current.forward[i] and current.forward[i].value < value:
                current = current.forward[i]

        current = current.forward[0]

        if current and current.value == value:
            return current
        return None

    def insert(self, value):
        """Insert value into skip list"""
        update = [None] * (self.max_level + 1)
        current = self.header

        # Find insert position at each level
        for i in range(self.level, -1, -1):
            while current.forward[i] and current.forward[i].value < value:
                current = current.forward[i]
            update[i] = current

        current = current.forward[0]

        # Value already exists
        if current and current.value == value:
            return False

        # Random level for new node
        new_level = self.random_level()

        if new_level > self.level:
            for i in range(self.level + 1, new_level + 1):
                update[i] = self.header
            self.level = new_level

        # Create and insert new node
        new_node = SkipListNode(value, new_level)
        for i in range(new_level + 1):
            new_node.forward[i] = update[i].forward[i]
            update[i].forward[i] = new_node

        return True

    def delete(self, value):
        """Delete value from skip list"""
        update = [None] * (self.max_level + 1)
        current = self.header

        for i in range(self.level, -1, -1):
            while current.forward[i] and current.forward[i].value < value:
                current = current.forward[i]
            update[i] = current

        current = current.forward[0]

        if current and current.value == value:
            for i in range(self.level + 1):
                if update[i].forward[i] != current:
                    break
                update[i].forward[i] = current.forward[i]

            # Reduce level if needed
            while self.level > 0 and self.header.forward[self.level] is None:
                self.level -= 1

            return True
        return False

    def display(self):
        """Display skip list structure"""
        for i in range(self.level, -1, -1):
            print(f"Level {i}: ", end="")
            node = self.header.forward[i]
            while node:
                print(f"{node.value} -> ", end="")
                node = node.forward[i]
            print("None")

# Example
sl = SkipList()
for x in [3, 6, 7, 9, 12, 19, 17, 26, 21, 25]:
    sl.insert(x)

sl.display()
print(f"\nSearch 19: {sl.search(19) is not None}")
print(f"Search 15: {sl.search(15) is not None}")

5. Universal Hashing

Problem with Deterministic Hashing

An adversary can construct inputs that all hash to same bucket → O(n) operations.

Universal Hash Families

class UniversalHashTable:
    """
    Hash table with universal hashing

    Guarantees expected O(1) operations regardless of input
    """

    def __init__(self, size=1009):
        self.size = size
        self.table = [[] for _ in range(size)]

        # Choose random parameters for hash function
        # h(x) = ((ax + b) mod p) mod m
        self.p = 1000000007  # Large prime
        self.a = random.randint(1, self.p - 1)
        self.b = random.randint(0, self.p - 1)

    def _hash(self, key):
        """Universal hash function"""
        if isinstance(key, str):
            # Convert string to number
            x = sum(ord(c) * (31 ** i) for i, c in enumerate(key))
        else:
            x = key
        return ((self.a * x + self.b) % self.p) % self.size

    def insert(self, key, value):
        """Insert key-value pair"""
        idx = self._hash(key)
        for i, (k, v) in enumerate(self.table[idx]):
            if k == key:
                self.table[idx][i] = (key, value)
                return
        self.table[idx].append((key, value))

    def search(self, key):
        """Search for key"""
        idx = self._hash(key)
        for k, v in self.table[idx]:
            if k == key:
                return v
        return None

    def delete(self, key):
        """Delete key"""
        idx = self._hash(key)
        for i, (k, v) in enumerate(self.table[idx]):
            if k == key:
                del self.table[idx][i]
                return True
        return False

Analysis

"""
Universal Hashing Property:

For hash family H, for any x ≠ y:
P_{h∈H}[h(x) = h(y)] ≤ 1/m

Consequence:
- Expected bucket size ≤ 1 + n/m
- With n ≤ m, expected O(1) operations
- Works for ANY input distribution
"""

6. Monte Carlo Methods

Estimating Pi

def estimate_pi(n_samples):
    """
    Estimate π using Monte Carlo

    Points uniformly in unit square
    Fraction in quarter circle = π/4
    """
    inside = 0
    for _ in range(n_samples):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            inside += 1

    return 4 * inside / n_samples

# Estimate with increasing samples
for n in [100, 1000, 10000, 100000, 1000000]:
    estimate = estimate_pi(n)
    error = abs(estimate - 3.14159265359)
    print(f"n={n:>7}: π ≈ {estimate:.6f}, error = {error:.6f}")

Monte Carlo Integration

def monte_carlo_integrate(f, a, b, n=10000):
    """
    Estimate ∫_a^b f(x) dx using Monte Carlo

    Based on: ∫f(x)dx = (b-a) × E[f(X)] where X ~ Uniform(a,b)
    """
    total = sum(f(random.uniform(a, b)) for _ in range(n))
    return (b - a) * total / n

import math

# Estimate ∫_0^1 e^x dx = e - 1 ≈ 1.718
estimate = monte_carlo_integrate(math.exp, 0, 1)
exact = math.e - 1
print(f"∫_0^1 e^x dx ≈ {estimate:.4f} (exact: {exact:.4f})")

# Estimate ∫_0^π sin(x) dx = 2
estimate = monte_carlo_integrate(math.sin, 0, math.pi)
print(f"∫_0^π sin(x) dx ≈ {estimate:.4f} (exact: 2.0000)")

Importance Sampling

def importance_sampling(f, p, p_sample, n=10000):
    """
    Estimate E_p[f(X)] using samples from different distribution q

    E_p[f(X)] = E_q[f(X) × p(X)/q(X)]

    Useful when p is hard to sample from directly
    """
    total = 0
    for _ in range(n):
        x = p_sample()  # Sample from proposal distribution
        weight = 1  # p(x) / q(x) - adjust based on distributions
        total += f(x) * weight
    return total / n

7. Randomized Min-Cut

Karger's Algorithm

def karger_min_cut(graph):
    """
    Randomized algorithm to find minimum cut

    Contracts random edges until 2 vertices remain
    Repeat O(n² log n) times for high probability of success
    """
    import copy

    def contract(g):
        """Contract graph until 2 vertices remain"""
        vertices = set(g.keys())

        while len(vertices) > 2:
            # Pick random edge
            edges = []
            for u in vertices:
                for v in g[u]:
                    if u < v:  # Avoid duplicates
                        edges.append((u, v))

            if not edges:
                break

            u, v = random.choice(edges)

            # Contract edge (u, v) by merging v into u
            g[u].extend(g[v])
            for w in g[v]:
                g[w] = [u if x == v else x for x in g[w]]

            # Remove self-loops
            g[u] = [x for x in g[u] if x != u]

            del g[v]
            vertices.remove(v)

        # Count remaining edges (the cut)
        for u in vertices:
            return len(g[u]) // 2  # Each edge counted twice

    # Run multiple times, return minimum
    min_cut = float('inf')
    n = len(graph)
    iterations = n * n * int(math.log(n) + 1)

    for _ in range(min(iterations, 100)):  # Cap iterations for demo
        g = copy.deepcopy(graph)
        cut = contract(g)
        if cut and cut < min_cut:
            min_cut = cut

    return min_cut

# Example graph (adjacency list with parallel edges)
graph = {
    1: [2, 2, 3],
    2: [1, 1, 3, 4],
    3: [1, 2, 4],
    4: [2, 3]
}

min_cut = karger_min_cut(graph)
print(f"Minimum cut: {min_cut}")

Analysis

"""
Karger's Algorithm Analysis:

P(specific min-cut survives one iteration) ≥ 2/n

P(survives all n-2 contractions) ≥ Π_{i=2}^{n-1} (1 - 2/i)
                                  = Π (i-2)/i
                                  = 2/(n(n-1))
                                  ≥ 2/n²

With n² log n repetitions:
P(fail all) ≤ (1 - 2/n²)^{n² log n}
            ≤ e^{-2 log n}
            = 1/n²

So high probability of success!
"""

8. Probabilistic Data Structures

Bloom Filters

import hashlib

class BloomFilter:
    """
    Space-efficient probabilistic set membership

    - No false negatives: if element was added, always returns True
    - False positives possible: may return True for non-added elements
    """

    def __init__(self, size, num_hashes):
        self.size = size
        self.num_hashes = num_hashes
        self.bit_array = [False] * size

    def _hashes(self, item):
        """Generate k hash values for item"""
        hashes = []
        for i in range(self.num_hashes):
            h = hashlib.md5(f"{item}{i}".encode()).hexdigest()
            hashes.append(int(h, 16) % self.size)
        return hashes

    def add(self, item):
        """Add item to filter"""
        for h in self._hashes(item):
            self.bit_array[h] = True

    def contains(self, item):
        """Check if item might be in set"""
        return all(self.bit_array[h] for h in self._hashes(item))

    @staticmethod
    def optimal_parameters(n, fp_rate):
        """Calculate optimal size and hash count"""
        # m = -n ln(p) / (ln 2)²
        m = int(-n * math.log(fp_rate) / (math.log(2) ** 2))
        # k = (m/n) ln 2
        k = int((m / n) * math.log(2))
        return m, k

# Example
n = 1000  # Expected elements
fp_rate = 0.01  # 1% false positive rate
m, k = BloomFilter.optimal_parameters(n, fp_rate)
print(f"Optimal: size={m}, hashes={k}")

bf = BloomFilter(m, k)
for i in range(n):
    bf.add(f"element_{i}")

# Check false positive rate
false_positives = sum(1 for i in range(n, 2*n) if bf.contains(f"element_{i}"))
print(f"False positive rate: {false_positives/n:.2%}")

Count-Min Sketch

class CountMinSketch:
    """
    Probabilistic frequency estimation

    Space: O(ε⁻¹ log δ⁻¹)
    Estimate ≥ true count, ≤ true count + εN with probability 1-δ
    """

    def __init__(self, width, depth):
        self.width = width
        self.depth = depth
        self.table = [[0] * width for _ in range(depth)]

    def _hashes(self, item):
        """Generate d hash values"""
        hashes = []
        for i in range(self.depth):
            h = hash(f"{item}{i}") % self.width
            hashes.append(h)
        return hashes

    def add(self, item, count=1):
        """Add count for item"""
        for i, h in enumerate(self._hashes(item)):
            self.table[i][h] += count

    def estimate(self, item):
        """Estimate count for item"""
        return min(self.table[i][h]
                   for i, h in enumerate(self._hashes(item)))

# Example: Stream frequency estimation
cms = CountMinSketch(width=1000, depth=5)

# Simulate stream
stream = ['a'] * 100 + ['b'] * 50 + ['c'] * 10 + ['d'] * 1
random.shuffle(stream)

for item in stream:
    cms.add(item)

for item in ['a', 'b', 'c', 'd', 'e']:
    print(f"Count of '{item}': estimated={cms.estimate(item)}")

Exercises

Basic

  1. Implement a randomized algorithm to find an element in an unsorted array. Analyze its expected running time.

  2. Use Monte Carlo simulation to estimate the probability of getting at least one 6 in four dice rolls.

  3. Implement reservoir sampling: maintain a uniform random sample of k elements from a stream.

Intermediate

  1. Implement Karger-Stein algorithm (improved min-cut) and compare with basic Karger's algorithm.

  2. Analyze the expected number of probes for successful/unsuccessful search in a hash table with linear probing.

  3. Implement a HyperLogLog sketch for cardinality estimation.

Advanced

  1. Prove that randomized QuickSort has O(n log n) expected comparisons.

  2. Implement a randomized treap (BST with random priorities) and verify it maintains balance.

  3. Design a randomized algorithm for the MAX-SAT problem and analyze its approximation ratio.


Summary

  • Las Vegas algorithms are always correct with random running time
  • Monte Carlo algorithms are always fast with random correctness
  • Randomized QuickSort achieves O(n log n) expected time regardless of input
  • Universal hashing defeats adversarial inputs
  • Skip lists provide simple balanced search with O(log n) expected operations
  • Monte Carlo methods estimate quantities through random sampling
  • Probabilistic data structures (Bloom filters, sketches) trade accuracy for space

Module Complete

Congratulations on completing the Probability and Statistics module! You've learned:

  1. Probability fundamentals and Bayes' theorem
  2. Random variables and distributions
  3. Statistical inference and hypothesis testing
  4. Randomized algorithms and probabilistic data structures

These concepts are foundational for machine learning, algorithm analysis, and systems design.

← Previous: Statistical Inference | Back to Course Index