Replication and Consistency
Introduction
Replication keeps copies of data on multiple nodes for fault tolerance, lower latency, and higher throughput. This reading covers replication strategies, consistency protocols, and conflict resolution.
Learning Objectives
By the end of this reading, you will be able to:
- Understand different replication strategies
- Implement quorum-based consistency
- Handle conflicts in replicated data
- Design anti-entropy mechanisms
- Apply CRDTs for conflict-free replication
1. Replication Strategies
Single-Leader (Primary-Backup)
One node handles all writes; followers replicate asynchronously or synchronously.
class SingleLeaderReplication:
"""Primary-backup replication"""
def __init__(self, node_id: str, is_leader: bool):
self.node_id = node_id
self.is_leader = is_leader
self.data = {}
self.followers = []
self.leader = None
def write(self, key: str, value: any) -> bool:
if not self.is_leader:
# Forward to leader
return self.leader.write(key, value)
# Leader handles write
self.data[key] = value
# Replicate to followers
for follower in self.followers:
follower.replicate(key, value)
return True
def read(self, key: str) -> any:
# Can read from any node (may be stale)
return self.data.get(key)
def replicate(self, key: str, value: any):
"""Receive replication from leader"""
self.data[key] = value
Multi-Leader
Multiple nodes can accept writes; used for multi-datacenter deployments.
class MultiLeaderReplication:
"""Multi-leader replication with conflict detection"""
def __init__(self, node_id: str):
self.node_id = node_id
self.data = {} # key -> (value, vector_clock)
self.peers = []
self.clock = VectorClock(node_id, [])
def write(self, key: str, value: any):
self.clock.tick()
self.data[key] = (value, self.clock.clock.copy())
# Async replicate to peers
for peer in self.peers:
peer.receive_update(key, value, self.clock.clock.copy(), self.node_id)
def receive_update(self, key: str, value: any, sender_clock: dict, sender_id: str):
if key not in self.data:
self.data[key] = (value, sender_clock)
self.clock.receive(sender_clock)
else:
existing_value, existing_clock = self.data[key]
# Detect conflict
if self.concurrent(existing_clock, sender_clock):
# Conflict! Use resolution strategy
resolved = self.resolve_conflict(existing_value, value, existing_clock, sender_clock)
self.data[key] = resolved
else:
# No conflict - use newer
if self.happened_after(sender_clock, existing_clock):
self.data[key] = (value, sender_clock)
self.clock.receive(sender_clock)
def resolve_conflict(self, v1, v2, c1, c2):
"""Last-writer-wins or custom resolution"""
# Simple: use higher timestamp sum
if sum(c1.values()) > sum(c2.values()):
return (v1, c1)
return (v2, c2)
Leaderless (Dynamo-Style)
Any node can accept writes; uses quorums for consistency.
class LeaderlessReplication:
"""Dynamo-style leaderless replication"""
def __init__(self, node_id: str, replicas: list, n: int, w: int, r: int):
"""
n: number of replicas
w: write quorum
r: read quorum
For consistency: w + r > n
"""
self.node_id = node_id
self.replicas = replicas
self.n = n
self.w = w
self.r = r
self.data = {} # key -> (value, version)
def write(self, key: str, value: any) -> bool:
# Write to all replicas, wait for w acknowledgments
version = int(time.time() * 1000)
acks = 0
for replica in self.replicas:
success = replica.store(key, value, version)
if success:
acks += 1
if acks >= self.w:
return True
return acks >= self.w
def read(self, key: str) -> any:
# Read from all replicas, wait for r responses
responses = []
for replica in self.replicas:
result = replica.fetch(key)
if result is not None:
responses.append(result)
if len(responses) >= self.r:
break
if not responses:
return None
# Return highest version
best = max(responses, key=lambda x: x[1])
return best[0]
def store(self, key: str, value: any, version: int) -> bool:
"""Store value if version is newer"""
if key not in self.data or self.data[key][1] < version:
self.data[key] = (value, version)
return True
return False
def fetch(self, key: str):
return self.data.get(key)
2. Quorums
Read and Write Quorums
class QuorumSystem:
"""Configurable quorum system"""
def __init__(self, n: int):
self.n = n
def strong_consistency(self):
"""w + r > n ensures overlap"""
w = self.n // 2 + 1
r = self.n // 2 + 1
return w, r
def write_heavy(self):
"""Fast writes, slow reads"""
w = 1
r = self.n
return w, r
def read_heavy(self):
"""Fast reads, slow writes"""
w = self.n
r = 1
return w, r
def balanced(self):
"""Majority quorum"""
w = r = self.n // 2 + 1
return w, r
# Example
qs = QuorumSystem(5)
print(f"Strong consistency: W={qs.strong_consistency()}") # W=3, R=3
print(f"Write heavy: W={qs.write_heavy()}") # W=1, R=5
print(f"Read heavy: W={qs.read_heavy()}") # W=5, R=1
Sloppy Quorums
Allow writes to "nearby" nodes when preferred nodes are unavailable:
class SloppyQuorum:
"""Sloppy quorum with hinted handoff"""
def __init__(self, all_nodes: list, n: int, w: int):
self.all_nodes = all_nodes
self.n = n
self.w = w
self.hints = {} # node -> list of hinted writes
def write(self, key: str, value: any, preferred_nodes: list):
"""Write with sloppy quorum"""
acks = 0
hints = []
# Try preferred nodes first
for node in preferred_nodes:
if node.is_available():
node.store(key, value)
acks += 1
else:
hints.append(node)
if acks >= self.w:
break
# If not enough, use other nodes
if acks < self.w:
for node in self.all_nodes:
if node not in preferred_nodes and node.is_available():
node.store_with_hint(key, value, hints[0]) # Store for original node
acks += 1
if acks >= self.w:
break
return acks >= self.w
def handoff(self, from_node, to_node):
"""Transfer hinted data when node recovers"""
for key, value in from_node.get_hints(to_node):
to_node.store(key, value)
from_node.delete_hint(key, to_node)
3. Conflict Resolution
Last-Writer-Wins (LWW)
class LWWRegister:
"""Last-writer-wins register"""
def __init__(self):
self.value = None
self.timestamp = 0
def write(self, value: any, timestamp: int):
if timestamp > self.timestamp:
self.value = value
self.timestamp = timestamp
def merge(self, other: 'LWWRegister'):
if other.timestamp > self.timestamp:
self.value = other.value
self.timestamp = other.timestamp
Multi-Value (Siblings)
class MultiValueRegister:
"""Keep all concurrent values (siblings)"""
def __init__(self):
self.values = [] # [(value, vector_clock)]
def write(self, value: any, clock: dict):
# Remove values that this write supersedes
self.values = [
(v, c) for v, c in self.values
if not self.happened_before(c, clock)
]
self.values.append((value, clock))
def read(self):
if len(self.values) == 1:
return self.values[0][0]
# Return all siblings - application must resolve
return [v for v, _ in self.values]
def merge(self, other: 'MultiValueRegister'):
combined = self.values + other.values
# Remove dominated values
result = []
for v, c in combined:
dominated = False
for v2, c2 in combined:
if c != c2 and self.happened_before(c, c2):
dominated = True
break
if not dominated:
result.append((v, c))
self.values = result
4. CRDTs (Conflict-free Replicated Data Types)
G-Counter (Grow-only Counter)
class GCounter:
"""Grow-only counter CRDT"""
def __init__(self, node_id: str):
self.node_id = node_id
self.counts = {} # node_id -> count
def increment(self, amount: int = 1):
if self.node_id not in self.counts:
self.counts[self.node_id] = 0
self.counts[self.node_id] += amount
def value(self) -> int:
return sum(self.counts.values())
def merge(self, other: 'GCounter'):
for node, count in other.counts.items():
self.counts[node] = max(self.counts.get(node, 0), count)
# Example
counter_a = GCounter("A")
counter_b = GCounter("B")
counter_a.increment(5)
counter_b.increment(3)
# Merge
counter_a.merge(counter_b)
counter_b.merge(counter_a)
print(f"A: {counter_a.value()}") # 8
print(f"B: {counter_b.value()}") # 8
PN-Counter (Increment and Decrement)
class PNCounter:
"""Positive-Negative counter CRDT"""
def __init__(self, node_id: str):
self.node_id = node_id
self.p = GCounter(node_id) # Increments
self.n = GCounter(node_id) # Decrements
def increment(self, amount: int = 1):
self.p.increment(amount)
def decrement(self, amount: int = 1):
self.n.increment(amount)
def value(self) -> int:
return self.p.value() - self.n.value()
def merge(self, other: 'PNCounter'):
self.p.merge(other.p)
self.n.merge(other.n)
LWW-Element-Set
class LWWElementSet:
"""Last-Writer-Wins Element Set CRDT"""
def __init__(self):
self.add_set = {} # element -> timestamp
self.remove_set = {} # element -> timestamp
def add(self, element, timestamp: int):
self.add_set[element] = max(self.add_set.get(element, 0), timestamp)
def remove(self, element, timestamp: int):
self.remove_set[element] = max(self.remove_set.get(element, 0), timestamp)
def contains(self, element) -> bool:
add_ts = self.add_set.get(element, 0)
remove_ts = self.remove_set.get(element, 0)
return add_ts > remove_ts
def elements(self) -> set:
return {e for e in self.add_set if self.contains(e)}
def merge(self, other: 'LWWElementSet'):
for e, ts in other.add_set.items():
self.add_set[e] = max(self.add_set.get(e, 0), ts)
for e, ts in other.remove_set.items():
self.remove_set[e] = max(self.remove_set.get(e, 0), ts)
5. Anti-Entropy
Read Repair
class ReadRepair:
"""Repair stale replicas during reads"""
def read_with_repair(self, key: str, replicas: list):
responses = []
for replica in replicas:
value, version = replica.fetch(key)
responses.append((replica, value, version))
# Find latest version
latest_replica, latest_value, latest_version = max(
responses,
key=lambda x: x[2]
)
# Repair stale replicas
for replica, value, version in responses:
if version < latest_version:
replica.store(key, latest_value, latest_version)
return latest_value
Merkle Trees
import hashlib
class MerkleTree:
"""Merkle tree for efficient sync"""
def __init__(self, items: list):
self.leaves = [self.hash_item(item) for item in items]
self.tree = self.build_tree(self.leaves)
def hash_item(self, item) -> str:
return hashlib.sha256(str(item).encode()).hexdigest()
def build_tree(self, leaves: list) -> list:
if len(leaves) == 0:
return []
if len(leaves) == 1:
return leaves
# Build levels bottom-up
tree = [leaves]
current = leaves
while len(current) > 1:
next_level = []
for i in range(0, len(current), 2):
if i + 1 < len(current):
combined = current[i] + current[i+1]
else:
combined = current[i]
next_level.append(self.hash_item(combined))
tree.append(next_level)
current = next_level
return tree
def root(self) -> str:
return self.tree[-1][0] if self.tree else ""
def diff(self, other: 'MerkleTree') -> list:
"""Find differing items"""
if self.root() == other.root():
return []
# Walk tree to find differences
diffs = []
self._diff_level(self.tree, other.tree, len(self.tree) - 1, 0, diffs)
return diffs
def _diff_level(self, tree1, tree2, level, index, diffs):
if level == 0:
if index < len(tree1[0]) and index < len(tree2[0]):
if tree1[0][index] != tree2[0][index]:
diffs.append(index)
return
h1 = tree1[level][index] if index < len(tree1[level]) else None
h2 = tree2[level][index] if index < len(tree2[level]) else None
if h1 != h2:
self._diff_level(tree1, tree2, level - 1, index * 2, diffs)
self._diff_level(tree1, tree2, level - 1, index * 2 + 1, diffs)
Summary
- Single-leader is simple but has single point of failure
- Multi-leader enables multi-datacenter but has conflicts
- Leaderless uses quorums for availability
- CRDTs provide automatic conflict resolution
- Anti-entropy keeps replicas synchronized
Module Complete
This completes the Distributed Systems module! You've learned fundamental concepts for building and reasoning about distributed applications.