Randomized Algorithms
Introduction
Randomized algorithms use random numbers as part of their logic. They often achieve better performance, simpler implementation, or solve problems that deterministic algorithms cannot efficiently solve. Understanding randomized algorithms requires probability theory and is essential for modern computer science.
Learning Objectives
By the end of this reading, you will be able to:
- Distinguish Las Vegas and Monte Carlo algorithms
- Analyze expected running time of randomized algorithms
- Understand and implement randomized QuickSort
- Apply randomization to data structures (skip lists, hash tables)
- Use Monte Carlo methods for estimation
- Understand probabilistic analysis of algorithms
1. Types of Randomized Algorithms
Las Vegas Algorithms
Always correct, but running time is random.
- Output is always correct
- Running time varies based on random choices
- Example: Randomized QuickSort
import random
def las_vegas_example(arr, target):
"""
Las Vegas search: randomly shuffle and search
Always finds target if present, time varies
"""
indices = list(range(len(arr)))
random.shuffle(indices)
for i in indices:
if arr[i] == target:
return i
return -1
Monte Carlo Algorithms
Always fast, but may be incorrect.
- Running time is bounded/predictable
- Output may be incorrect with some probability
- Can often reduce error probability by repetition
def monte_carlo_primality(n, k=10):
"""
Miller-Rabin primality test (Monte Carlo)
If n is prime: always returns True
If n is composite: returns True with probability ≤ 4^(-k)
"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
# Write n-1 as 2^r * d
r, d = 0, n - 1
while d % 2 == 0:
r += 1
d //= 2
def witness(a):
"""Test if a is a witness for compositeness"""
x = pow(a, d, n)
if x == 1 or x == n - 1:
return False
for _ in range(r - 1):
x = pow(x, 2, n)
if x == n - 1:
return False
return True
for _ in range(k):
a = random.randrange(2, n)
if witness(a):
return False
return True # Probably prime
2. Randomized QuickSort
Algorithm
def randomized_quicksort(arr):
"""
QuickSort with random pivot selection
Expected time: O(n log n)
Worst case: O(n²) but very unlikely
"""
if len(arr) <= 1:
return arr
# Random pivot selection
pivot_idx = random.randrange(len(arr))
pivot = arr[pivot_idx]
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
return randomized_quicksort(left) + middle + randomized_quicksort(right)
def randomized_quicksort_inplace(arr, low=0, high=None):
"""In-place randomized QuickSort"""
if high is None:
high = len(arr) - 1
if low < high:
pivot_idx = randomized_partition(arr, low, high)
randomized_quicksort_inplace(arr, low, pivot_idx - 1)
randomized_quicksort_inplace(arr, pivot_idx + 1, high)
def randomized_partition(arr, low, high):
"""Partition with random pivot"""
# Choose random pivot and swap to end
pivot_idx = random.randint(low, high)
arr[pivot_idx], arr[high] = arr[high], arr[pivot_idx]
pivot = arr[high]
i = low - 1
for j in range(low, high):
if arr[j] <= pivot:
i += 1
arr[i], arr[j] = arr[j], arr[i]
arr[i + 1], arr[high] = arr[high], arr[i + 1]
return i + 1
Analysis
def analyze_quicksort():
"""
Expected number of comparisons in randomized QuickSort
Let Xij = 1 if elements ranked i and j are compared
E[comparisons] = Σᵢ Σⱼ>ᵢ E[Xij] = Σᵢ Σⱼ>ᵢ P(i and j compared)
Key insight: i and j are compared iff one is chosen as pivot
before any element between them.
P(i,j compared) = 2/(j-i+1)
E[X] = Σᵢ Σⱼ>ᵢ 2/(j-i+1)
= Σᵢ Σₖ₌₂ⁿ⁻ⁱ⁺¹ 2/k
≈ 2n × Hₙ
≈ 2n ln n
= O(n log n)
"""
import math
def expected_comparisons(n):
"""Calculate expected comparisons"""
total = 0
for i in range(1, n + 1):
for j in range(i + 1, n + 1):
total += 2 / (j - i + 1)
return total
def harmonic(n):
return sum(1/k for k in range(1, n + 1))
for n in [10, 100, 1000]:
exact = expected_comparisons(n)
approx = 2 * n * harmonic(n)
print(f"n={n}: exact={exact:.1f}, approx={approx:.1f}, 2n ln n={2*n*math.log(n):.1f}")
analyze_quicksort()
3. Randomized Selection
QuickSelect
Find the k-th smallest element in expected O(n) time.
def quickselect(arr, k):
"""
Find k-th smallest element (0-indexed)
Expected time: O(n)
Worst case: O(n²)
"""
if len(arr) == 1:
return arr[0]
pivot = random.choice(arr)
lows = [x for x in arr if x < pivot]
highs = [x for x in arr if x > pivot]
pivots = [x for x in arr if x == pivot]
if k < len(lows):
return quickselect(lows, k)
elif k < len(lows) + len(pivots):
return pivot
else:
return quickselect(highs, k - len(lows) - len(pivots))
# Find median
arr = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
median = quickselect(arr.copy(), len(arr) // 2)
print(f"Median: {median}")
Analysis of QuickSelect
"""
Expected running time analysis:
Let T(n) = expected time on n elements
Good pivot: splits into at most 3n/4 elements on larger side
P(good pivot) ≥ 1/2
T(n) ≤ cn + (1/2)T(n) + (1/2)T(3n/4)
≤ cn + (1/2)T(n) + (1/2)T(3n/4)
Solving: T(n) = O(n)
"""
4. Skip Lists
A randomized data structure for ordered sets.
import random
class SkipListNode:
def __init__(self, value, level):
self.value = value
self.forward = [None] * (level + 1)
class SkipList:
"""
Probabilistic balanced search structure
Expected search/insert/delete: O(log n)
Space: O(n) expected
"""
def __init__(self, max_level=16, p=0.5):
self.max_level = max_level
self.p = p
self.level = 0
self.header = SkipListNode(None, max_level)
def random_level(self):
"""Generate random level with geometric distribution"""
level = 0
while random.random() < self.p and level < self.max_level:
level += 1
return level
def search(self, value):
"""Search for value, return node or None"""
current = self.header
# Start from highest level
for i in range(self.level, -1, -1):
while current.forward[i] and current.forward[i].value < value:
current = current.forward[i]
current = current.forward[0]
if current and current.value == value:
return current
return None
def insert(self, value):
"""Insert value into skip list"""
update = [None] * (self.max_level + 1)
current = self.header
# Find insert position at each level
for i in range(self.level, -1, -1):
while current.forward[i] and current.forward[i].value < value:
current = current.forward[i]
update[i] = current
current = current.forward[0]
# Value already exists
if current and current.value == value:
return False
# Random level for new node
new_level = self.random_level()
if new_level > self.level:
for i in range(self.level + 1, new_level + 1):
update[i] = self.header
self.level = new_level
# Create and insert new node
new_node = SkipListNode(value, new_level)
for i in range(new_level + 1):
new_node.forward[i] = update[i].forward[i]
update[i].forward[i] = new_node
return True
def delete(self, value):
"""Delete value from skip list"""
update = [None] * (self.max_level + 1)
current = self.header
for i in range(self.level, -1, -1):
while current.forward[i] and current.forward[i].value < value:
current = current.forward[i]
update[i] = current
current = current.forward[0]
if current and current.value == value:
for i in range(self.level + 1):
if update[i].forward[i] != current:
break
update[i].forward[i] = current.forward[i]
# Reduce level if needed
while self.level > 0 and self.header.forward[self.level] is None:
self.level -= 1
return True
return False
def display(self):
"""Display skip list structure"""
for i in range(self.level, -1, -1):
print(f"Level {i}: ", end="")
node = self.header.forward[i]
while node:
print(f"{node.value} -> ", end="")
node = node.forward[i]
print("None")
# Example
sl = SkipList()
for x in [3, 6, 7, 9, 12, 19, 17, 26, 21, 25]:
sl.insert(x)
sl.display()
print(f"\nSearch 19: {sl.search(19) is not None}")
print(f"Search 15: {sl.search(15) is not None}")
5. Universal Hashing
Problem with Deterministic Hashing
An adversary can construct inputs that all hash to same bucket → O(n) operations.
Universal Hash Families
class UniversalHashTable:
"""
Hash table with universal hashing
Guarantees expected O(1) operations regardless of input
"""
def __init__(self, size=1009):
self.size = size
self.table = [[] for _ in range(size)]
# Choose random parameters for hash function
# h(x) = ((ax + b) mod p) mod m
self.p = 1000000007 # Large prime
self.a = random.randint(1, self.p - 1)
self.b = random.randint(0, self.p - 1)
def _hash(self, key):
"""Universal hash function"""
if isinstance(key, str):
# Convert string to number
x = sum(ord(c) * (31 ** i) for i, c in enumerate(key))
else:
x = key
return ((self.a * x + self.b) % self.p) % self.size
def insert(self, key, value):
"""Insert key-value pair"""
idx = self._hash(key)
for i, (k, v) in enumerate(self.table[idx]):
if k == key:
self.table[idx][i] = (key, value)
return
self.table[idx].append((key, value))
def search(self, key):
"""Search for key"""
idx = self._hash(key)
for k, v in self.table[idx]:
if k == key:
return v
return None
def delete(self, key):
"""Delete key"""
idx = self._hash(key)
for i, (k, v) in enumerate(self.table[idx]):
if k == key:
del self.table[idx][i]
return True
return False
Analysis
"""
Universal Hashing Property:
For hash family H, for any x ≠ y:
P_{h∈H}[h(x) = h(y)] ≤ 1/m
Consequence:
- Expected bucket size ≤ 1 + n/m
- With n ≤ m, expected O(1) operations
- Works for ANY input distribution
"""
6. Monte Carlo Methods
Estimating Pi
def estimate_pi(n_samples):
"""
Estimate π using Monte Carlo
Points uniformly in unit square
Fraction in quarter circle = π/4
"""
inside = 0
for _ in range(n_samples):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
inside += 1
return 4 * inside / n_samples
# Estimate with increasing samples
for n in [100, 1000, 10000, 100000, 1000000]:
estimate = estimate_pi(n)
error = abs(estimate - 3.14159265359)
print(f"n={n:>7}: π ≈ {estimate:.6f}, error = {error:.6f}")
Monte Carlo Integration
def monte_carlo_integrate(f, a, b, n=10000):
"""
Estimate ∫_a^b f(x) dx using Monte Carlo
Based on: ∫f(x)dx = (b-a) × E[f(X)] where X ~ Uniform(a,b)
"""
total = sum(f(random.uniform(a, b)) for _ in range(n))
return (b - a) * total / n
import math
# Estimate ∫_0^1 e^x dx = e - 1 ≈ 1.718
estimate = monte_carlo_integrate(math.exp, 0, 1)
exact = math.e - 1
print(f"∫_0^1 e^x dx ≈ {estimate:.4f} (exact: {exact:.4f})")
# Estimate ∫_0^π sin(x) dx = 2
estimate = monte_carlo_integrate(math.sin, 0, math.pi)
print(f"∫_0^π sin(x) dx ≈ {estimate:.4f} (exact: 2.0000)")
Importance Sampling
def importance_sampling(f, p, p_sample, n=10000):
"""
Estimate E_p[f(X)] using samples from different distribution q
E_p[f(X)] = E_q[f(X) × p(X)/q(X)]
Useful when p is hard to sample from directly
"""
total = 0
for _ in range(n):
x = p_sample() # Sample from proposal distribution
weight = 1 # p(x) / q(x) - adjust based on distributions
total += f(x) * weight
return total / n
7. Randomized Min-Cut
Karger's Algorithm
def karger_min_cut(graph):
"""
Randomized algorithm to find minimum cut
Contracts random edges until 2 vertices remain
Repeat O(n² log n) times for high probability of success
"""
import copy
def contract(g):
"""Contract graph until 2 vertices remain"""
vertices = set(g.keys())
while len(vertices) > 2:
# Pick random edge
edges = []
for u in vertices:
for v in g[u]:
if u < v: # Avoid duplicates
edges.append((u, v))
if not edges:
break
u, v = random.choice(edges)
# Contract edge (u, v) by merging v into u
g[u].extend(g[v])
for w in g[v]:
g[w] = [u if x == v else x for x in g[w]]
# Remove self-loops
g[u] = [x for x in g[u] if x != u]
del g[v]
vertices.remove(v)
# Count remaining edges (the cut)
for u in vertices:
return len(g[u]) // 2 # Each edge counted twice
# Run multiple times, return minimum
min_cut = float('inf')
n = len(graph)
iterations = n * n * int(math.log(n) + 1)
for _ in range(min(iterations, 100)): # Cap iterations for demo
g = copy.deepcopy(graph)
cut = contract(g)
if cut and cut < min_cut:
min_cut = cut
return min_cut
# Example graph (adjacency list with parallel edges)
graph = {
1: [2, 2, 3],
2: [1, 1, 3, 4],
3: [1, 2, 4],
4: [2, 3]
}
min_cut = karger_min_cut(graph)
print(f"Minimum cut: {min_cut}")
Analysis
"""
Karger's Algorithm Analysis:
P(specific min-cut survives one iteration) ≥ 2/n
P(survives all n-2 contractions) ≥ Π_{i=2}^{n-1} (1 - 2/i)
= Π (i-2)/i
= 2/(n(n-1))
≥ 2/n²
With n² log n repetitions:
P(fail all) ≤ (1 - 2/n²)^{n² log n}
≤ e^{-2 log n}
= 1/n²
So high probability of success!
"""
8. Probabilistic Data Structures
Bloom Filters
import hashlib
class BloomFilter:
"""
Space-efficient probabilistic set membership
- No false negatives: if element was added, always returns True
- False positives possible: may return True for non-added elements
"""
def __init__(self, size, num_hashes):
self.size = size
self.num_hashes = num_hashes
self.bit_array = [False] * size
def _hashes(self, item):
"""Generate k hash values for item"""
hashes = []
for i in range(self.num_hashes):
h = hashlib.md5(f"{item}{i}".encode()).hexdigest()
hashes.append(int(h, 16) % self.size)
return hashes
def add(self, item):
"""Add item to filter"""
for h in self._hashes(item):
self.bit_array[h] = True
def contains(self, item):
"""Check if item might be in set"""
return all(self.bit_array[h] for h in self._hashes(item))
@staticmethod
def optimal_parameters(n, fp_rate):
"""Calculate optimal size and hash count"""
# m = -n ln(p) / (ln 2)²
m = int(-n * math.log(fp_rate) / (math.log(2) ** 2))
# k = (m/n) ln 2
k = int((m / n) * math.log(2))
return m, k
# Example
n = 1000 # Expected elements
fp_rate = 0.01 # 1% false positive rate
m, k = BloomFilter.optimal_parameters(n, fp_rate)
print(f"Optimal: size={m}, hashes={k}")
bf = BloomFilter(m, k)
for i in range(n):
bf.add(f"element_{i}")
# Check false positive rate
false_positives = sum(1 for i in range(n, 2*n) if bf.contains(f"element_{i}"))
print(f"False positive rate: {false_positives/n:.2%}")
Count-Min Sketch
class CountMinSketch:
"""
Probabilistic frequency estimation
Space: O(ε⁻¹ log δ⁻¹)
Estimate ≥ true count, ≤ true count + εN with probability 1-δ
"""
def __init__(self, width, depth):
self.width = width
self.depth = depth
self.table = [[0] * width for _ in range(depth)]
def _hashes(self, item):
"""Generate d hash values"""
hashes = []
for i in range(self.depth):
h = hash(f"{item}{i}") % self.width
hashes.append(h)
return hashes
def add(self, item, count=1):
"""Add count for item"""
for i, h in enumerate(self._hashes(item)):
self.table[i][h] += count
def estimate(self, item):
"""Estimate count for item"""
return min(self.table[i][h]
for i, h in enumerate(self._hashes(item)))
# Example: Stream frequency estimation
cms = CountMinSketch(width=1000, depth=5)
# Simulate stream
stream = ['a'] * 100 + ['b'] * 50 + ['c'] * 10 + ['d'] * 1
random.shuffle(stream)
for item in stream:
cms.add(item)
for item in ['a', 'b', 'c', 'd', 'e']:
print(f"Count of '{item}': estimated={cms.estimate(item)}")
Exercises
Basic
Implement a randomized algorithm to find an element in an unsorted array. Analyze its expected running time.
Use Monte Carlo simulation to estimate the probability of getting at least one 6 in four dice rolls.
Implement reservoir sampling: maintain a uniform random sample of k elements from a stream.
Intermediate
Implement Karger-Stein algorithm (improved min-cut) and compare with basic Karger's algorithm.
Analyze the expected number of probes for successful/unsuccessful search in a hash table with linear probing.
Implement a HyperLogLog sketch for cardinality estimation.
Advanced
Prove that randomized QuickSort has O(n log n) expected comparisons.
Implement a randomized treap (BST with random priorities) and verify it maintains balance.
Design a randomized algorithm for the MAX-SAT problem and analyze its approximation ratio.
Summary
- Las Vegas algorithms are always correct with random running time
- Monte Carlo algorithms are always fast with random correctness
- Randomized QuickSort achieves O(n log n) expected time regardless of input
- Universal hashing defeats adversarial inputs
- Skip lists provide simple balanced search with O(log n) expected operations
- Monte Carlo methods estimate quantities through random sampling
- Probabilistic data structures (Bloom filters, sketches) trade accuracy for space
Module Complete
Congratulations on completing the Probability and Statistics module! You've learned:
- Probability fundamentals and Bayes' theorem
- Random variables and distributions
- Statistical inference and hypothesis testing
- Randomized algorithms and probabilistic data structures
These concepts are foundational for machine learning, algorithm analysis, and systems design.