Replication and Consistency
Replication keeps copies of data on multiple nodes for fault tolerance, lower latency, and higher throughput. ## 1. Replication Strategies
Single-Leader (Primary-Backup)
One node handles all writes; followers replicate asynchronously or synchronously.
class SingleLeaderReplication:
"""Primary-backup replication"""
def __init__(self, node_id: str, is_leader: bool):
self.node_id = node_id
self.is_leader = is_leader
self.data = {}
self.followers = []
self.leader = None
def write(self, key: str, value: any) -> bool:
if not self.is_leader:
# Forward to leader
return self.leader.write(key, value)
# Leader handles write
self.data[key] = value
# Replicate to followers
for follower in self.followers:
follower.replicate(key, value)
return True
def read(self, key: str) -> any:
# Can read from any node (may be stale)
return self.data.get(key)
def replicate(self, key: str, value: any):
"""Receive replication from leader"""
self.data[key] = value
Multi-Leader
Multiple nodes can accept writes; used for multi-datacenter deployments.
class MultiLeaderReplication:
"""Multi-leader replication with conflict detection"""
def __init__(self, node_id: str):
self.node_id = node_id
self.data = {} # key -> (value, vector_clock)
self.peers = []
self.clock = VectorClock(node_id, [])
def write(self, key: str, value: any):
self.clock.tick()
self.data[key] = (value, self.clock.clock.copy())
# Async replicate to peers
for peer in self.peers:
peer.receive_update(key, value, self.clock.clock.copy(), self.node_id)
def receive_update(self, key: str, value: any, sender_clock: dict, sender_id: str):
if key not in self.data:
self.data[key] = (value, sender_clock)
self.clock.receive(sender_clock)
else:
existing_value, existing_clock = self.data[key]
# Detect conflict
if self.concurrent(existing_clock, sender_clock):
# Conflict! Use resolution strategy
resolved = self.resolve_conflict(existing_value, value, existing_clock, sender_clock)
self.data[key] = resolved
else:
# No conflict - use newer
if self.happened_after(sender_clock, existing_clock):
self.data[key] = (value, sender_clock)
self.clock.receive(sender_clock)
def resolve_conflict(self, v1, v2, c1, c2):
"""Last-writer-wins or custom resolution"""
# Simple: use higher timestamp sum
if sum(c1.values()) > sum(c2.values()):
return (v1, c1)
return (v2, c2)
Leaderless (Dynamo-Style)
Any node can accept writes; uses quorums for consistency.
class LeaderlessReplication:
"""Dynamo-style leaderless replication"""
def __init__(self, node_id: str, replicas: list, n: int, w: int, r: int):
"""
n: number of replicas
w: write quorum
r: read quorum
For consistency: w + r > n
"""
self.node_id = node_id
self.replicas = replicas
self.n = n
self.w = w
self.r = r
self.data = {} # key -> (value, version)
def write(self, key: str, value: any) -> bool:
# Write to all replicas, wait for w acknowledgments
version = int(time.time() * 1000)
acks = 0
for replica in self.replicas:
success = replica.store(key, value, version)
if success:
acks += 1
if acks >= self.w:
return True
return acks >= self.w
def read(self, key: str) -> any:
# Read from all replicas, wait for r responses
responses = []
for replica in self.replicas:
result = replica.fetch(key)
if result is not None:
responses.append(result)
if len(responses) >= self.r:
break
if not responses:
return None
# Return highest version
best = max(responses, key=lambda x: x[1])
return best[0]
def store(self, key: str, value: any, version: int) -> bool:
"""Store value if version is newer"""
if key not in self.data or self.data[key][1] < version:
self.data[key] = (value, version)
return True
return False
def fetch(self, key: str):
return self.data.get(key)
2. Quorums
Read and Write Quorums
class QuorumSystem:
"""Configurable quorum system"""
def __init__(self, n: int):
self.n = n
def strong_consistency(self):
"""w + r > n ensures overlap"""
w = self.n // 2 + 1
r = self.n // 2 + 1
return w, r
def write_heavy(self):
"""Fast writes, slow reads"""
w = 1
r = self.n
return w, r
def read_heavy(self):
"""Fast reads, slow writes"""
w = self.n
r = 1
return w, r
def balanced(self):
"""Majority quorum"""
w = r = self.n // 2 + 1
return w, r
# Example
qs = QuorumSystem(5)
print(f"Strong consistency: W={qs.strong_consistency()}") # W=3, R=3
print(f"Write heavy: W={qs.write_heavy()}") # W=1, R=5
print(f"Read heavy: W={qs.read_heavy()}") # W=5, R=1
Sloppy Quorums
Allow writes to "nearby" nodes when preferred nodes are unavailable:
class SloppyQuorum:
"""Sloppy quorum with hinted handoff"""
def __init__(self, all_nodes: list, n: int, w: int):
self.all_nodes = all_nodes
self.n = n
self.w = w
self.hints = {} # node -> list of hinted writes
def write(self, key: str, value: any, preferred_nodes: list):
"""Write with sloppy quorum"""
acks = 0
hints = []
# Try preferred nodes first
for node in preferred_nodes:
if node.is_available():
node.store(key, value)
acks += 1
else:
hints.append(node)
if acks >= self.w:
break
# If not enough, use other nodes
if acks < self.w:
for node in self.all_nodes:
if node not in preferred_nodes and node.is_available():
node.store_with_hint(key, value, hints[0]) # Store for original node
acks += 1
if acks >= self.w:
break
return acks >= self.w
def handoff(self, from_node, to_node):
"""Transfer hinted data when node recovers"""
for key, value in from_node.get_hints(to_node):
to_node.store(key, value)
from_node.delete_hint(key, to_node)
3. Conflict Resolution
Last-Writer-Wins (LWW)
class LWWRegister:
"""Last-writer-wins register"""
def __init__(self):
self.value = None
self.timestamp = 0
def write(self, value: any, timestamp: int):
if timestamp > self.timestamp:
self.value = value
self.timestamp = timestamp
def merge(self, other: 'LWWRegister'):
if other.timestamp > self.timestamp:
self.value = other.value
self.timestamp = other.timestamp
Multi-Value (Siblings)
class MultiValueRegister:
"""Keep all concurrent values (siblings)"""
def __init__(self):
self.values = [] # [(value, vector_clock)]
def write(self, value: any, clock: dict):
# Remove values that this write supersedes
self.values = [
(v, c) for v, c in self.values
if not self.happened_before(c, clock)
]
self.values.append((value, clock))
def read(self):
if len(self.values) == 1:
return self.values[0][0]
# Return all siblings - application must resolve
return [v for v, _ in self.values]
def merge(self, other: 'MultiValueRegister'):
combined = self.values + other.values
# Remove dominated values
result = []
for v, c in combined:
dominated = False
for v2, c2 in combined:
if c != c2 and self.happened_before(c, c2):
dominated = True
break
if not dominated:
result.append((v, c))
self.values = result
4. CRDTs (Conflict-free Replicated Data Types)
G-Counter (Grow-only Counter)
class GCounter:
"""Grow-only counter CRDT"""
def __init__(self, node_id: str):
self.node_id = node_id
self.counts = {} # node_id -> count
def increment(self, amount: int = 1):
if self.node_id not in self.counts:
self.counts[self.node_id] = 0
self.counts[self.node_id] += amount
def value(self) -> int:
return sum(self.counts.values())
def merge(self, other: 'GCounter'):
for node, count in other.counts.items():
self.counts[node] = max(self.counts.get(node, 0), count)
# Example
counter_a = GCounter("A")
counter_b = GCounter("B")
counter_a.increment(5)
counter_b.increment(3)
# Merge
counter_a.merge(counter_b)
counter_b.merge(counter_a)
print(f"A: {counter_a.value()}") # 8
print(f"B: {counter_b.value()}") # 8
PN-Counter (Increment and Decrement)
class PNCounter:
"""Positive-Negative counter CRDT"""
def __init__(self, node_id: str):
self.node_id = node_id
self.p = GCounter(node_id) # Increments
self.n = GCounter(node_id) # Decrements
def increment(self, amount: int = 1):
self.p.increment(amount)
def decrement(self, amount: int = 1):
self.n.increment(amount)
def value(self) -> int:
return self.p.value() - self.n.value()
def merge(self, other: 'PNCounter'):
self.p.merge(other.p)
self.n.merge(other.n)
LWW-Element-Set
class LWWElementSet:
"""Last-Writer-Wins Element Set CRDT"""
def __init__(self):
self.add_set = {} # element -> timestamp
self.remove_set = {} # element -> timestamp
def add(self, element, timestamp: int):
self.add_set[element] = max(self.add_set.get(element, 0), timestamp)
def remove(self, element, timestamp: int):
self.remove_set[element] = max(self.remove_set.get(element, 0), timestamp)
def contains(self, element) -> bool:
add_ts = self.add_set.get(element, 0)
remove_ts = self.remove_set.get(element, 0)
return add_ts > remove_ts
def elements(self) -> set:
return {e for e in self.add_set if self.contains(e)}
def merge(self, other: 'LWWElementSet'):
for e, ts in other.add_set.items():
self.add_set[e] = max(self.add_set.get(e, 0), ts)
for e, ts in other.remove_set.items():
self.remove_set[e] = max(self.remove_set.get(e, 0), ts)
5. Anti-Entropy
Read Repair
class ReadRepair:
"""Repair stale replicas during reads"""
def read_with_repair(self, key: str, replicas: list):
responses = []
for replica in replicas:
value, version = replica.fetch(key)
responses.append((replica, value, version))
# Find latest version
latest_replica, latest_value, latest_version = max(
responses,
key=lambda x: x[2]
)
# Repair stale replicas
for replica, value, version in responses:
if version < latest_version:
replica.store(key, latest_value, latest_version)
return latest_value
Merkle Trees
import hashlib
class MerkleTree:
"""Merkle tree for efficient sync"""
def __init__(self, items: list):
self.leaves = [self.hash_item(item) for item in items]
self.tree = self.build_tree(self.leaves)
def hash_item(self, item) -> str:
return hashlib.sha256(str(item).encode()).hexdigest()
def build_tree(self, leaves: list) -> list:
if len(leaves) == 0:
return []
if len(leaves) == 1:
return leaves
# Build levels bottom-up
tree = [leaves]
current = leaves
while len(current) > 1:
next_level = []
for i in range(0, len(current), 2):
if i + 1 < len(current):
combined = current[i] + current[i+1]
else:
combined = current[i]
next_level.append(self.hash_item(combined))
tree.append(next_level)
current = next_level
return tree
def root(self) -> str:
return self.tree[-1][0] if self.tree else ""
def diff(self, other: 'MerkleTree') -> list:
"""Find differing items"""
if self.root() == other.root():
return []
# Walk tree to find differences
diffs = []
self._diff_level(self.tree, other.tree, len(self.tree) - 1, 0, diffs)
return diffs
def _diff_level(self, tree1, tree2, level, index, diffs):
if level == 0:
if index < len(tree1[0]) and index < len(tree2[0]):
if tree1[0][index] != tree2[0][index]:
diffs.append(index)
return
h1 = tree1[level][index] if index < len(tree1[level]) else None
h2 = tree2[level][index] if index < len(tree2[level]) else None
if h1 != h2:
self._diff_level(tree1, tree2, level - 1, index * 2, diffs)
self._diff_level(tree1, tree2, level - 1, index * 2 + 1, diffs)
Summary
- Single-leader is simple but has single point of failure
- Multi-leader enables multi-datacenter but has conflicts
- Leaderless uses quorums for availability
- CRDTs provide automatic conflict resolution
- Anti-entropy keeps replicas synchronized
Module Complete
This completes the Distributed Systems module! You've learned fundamental concepts for building and reasoning about distributed applications.