Replication and Consistency

Replication keeps copies of data on multiple nodes for fault tolerance, lower latency, and higher throughput. ## 1. Replication Strategies

Single-Leader (Primary-Backup)

One node handles all writes; followers replicate asynchronously or synchronously.

class SingleLeaderReplication:
    """Primary-backup replication"""

    def __init__(self, node_id: str, is_leader: bool):
        self.node_id = node_id
        self.is_leader = is_leader
        self.data = {}
        self.followers = []
        self.leader = None

    def write(self, key: str, value: any) -> bool:
        if not self.is_leader:
            # Forward to leader
            return self.leader.write(key, value)

        # Leader handles write
        self.data[key] = value

        # Replicate to followers
        for follower in self.followers:
            follower.replicate(key, value)

        return True

    def read(self, key: str) -> any:
        # Can read from any node (may be stale)
        return self.data.get(key)

    def replicate(self, key: str, value: any):
        """Receive replication from leader"""
        self.data[key] = value

Multi-Leader

Multiple nodes can accept writes; used for multi-datacenter deployments.

class MultiLeaderReplication:
    """Multi-leader replication with conflict detection"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.data = {}  # key -> (value, vector_clock)
        self.peers = []
        self.clock = VectorClock(node_id, [])

    def write(self, key: str, value: any):
        self.clock.tick()
        self.data[key] = (value, self.clock.clock.copy())

        # Async replicate to peers
        for peer in self.peers:
            peer.receive_update(key, value, self.clock.clock.copy(), self.node_id)

    def receive_update(self, key: str, value: any, sender_clock: dict, sender_id: str):
        if key not in self.data:
            self.data[key] = (value, sender_clock)
            self.clock.receive(sender_clock)
        else:
            existing_value, existing_clock = self.data[key]
            # Detect conflict
            if self.concurrent(existing_clock, sender_clock):
                # Conflict! Use resolution strategy
                resolved = self.resolve_conflict(existing_value, value, existing_clock, sender_clock)
                self.data[key] = resolved
            else:
                # No conflict - use newer
                if self.happened_after(sender_clock, existing_clock):
                    self.data[key] = (value, sender_clock)
            self.clock.receive(sender_clock)

    def resolve_conflict(self, v1, v2, c1, c2):
        """Last-writer-wins or custom resolution"""
        # Simple: use higher timestamp sum
        if sum(c1.values()) > sum(c2.values()):
            return (v1, c1)
        return (v2, c2)

Leaderless (Dynamo-Style)

Any node can accept writes; uses quorums for consistency.

class LeaderlessReplication:
    """Dynamo-style leaderless replication"""

    def __init__(self, node_id: str, replicas: list, n: int, w: int, r: int):
        """
        n: number of replicas
        w: write quorum
        r: read quorum
        For consistency: w + r > n
        """
        self.node_id = node_id
        self.replicas = replicas
        self.n = n
        self.w = w
        self.r = r
        self.data = {}  # key -> (value, version)

    def write(self, key: str, value: any) -> bool:
        # Write to all replicas, wait for w acknowledgments
        version = int(time.time() * 1000)

        acks = 0
        for replica in self.replicas:
            success = replica.store(key, value, version)
            if success:
                acks += 1
                if acks >= self.w:
                    return True

        return acks >= self.w

    def read(self, key: str) -> any:
        # Read from all replicas, wait for r responses
        responses = []
        for replica in self.replicas:
            result = replica.fetch(key)
            if result is not None:
                responses.append(result)
                if len(responses) >= self.r:
                    break

        if not responses:
            return None

        # Return highest version
        best = max(responses, key=lambda x: x[1])
        return best[0]

    def store(self, key: str, value: any, version: int) -> bool:
        """Store value if version is newer"""
        if key not in self.data or self.data[key][1] < version:
            self.data[key] = (value, version)
            return True
        return False

    def fetch(self, key: str):
        return self.data.get(key)

2. Quorums

Read and Write Quorums

class QuorumSystem:
    """Configurable quorum system"""

    def __init__(self, n: int):
        self.n = n

    def strong_consistency(self):
        """w + r > n ensures overlap"""
        w = self.n // 2 + 1
        r = self.n // 2 + 1
        return w, r

    def write_heavy(self):
        """Fast writes, slow reads"""
        w = 1
        r = self.n
        return w, r

    def read_heavy(self):
        """Fast reads, slow writes"""
        w = self.n
        r = 1
        return w, r

    def balanced(self):
        """Majority quorum"""
        w = r = self.n // 2 + 1
        return w, r

# Example
qs = QuorumSystem(5)
print(f"Strong consistency: W={qs.strong_consistency()}")  # W=3, R=3
print(f"Write heavy: W={qs.write_heavy()}")  # W=1, R=5
print(f"Read heavy: W={qs.read_heavy()}")  # W=5, R=1

Sloppy Quorums

Allow writes to "nearby" nodes when preferred nodes are unavailable:

class SloppyQuorum:
    """Sloppy quorum with hinted handoff"""

    def __init__(self, all_nodes: list, n: int, w: int):
        self.all_nodes = all_nodes
        self.n = n
        self.w = w
        self.hints = {}  # node -> list of hinted writes

    def write(self, key: str, value: any, preferred_nodes: list):
        """Write with sloppy quorum"""
        acks = 0
        hints = []

        # Try preferred nodes first
        for node in preferred_nodes:
            if node.is_available():
                node.store(key, value)
                acks += 1
            else:
                hints.append(node)

            if acks >= self.w:
                break

        # If not enough, use other nodes
        if acks < self.w:
            for node in self.all_nodes:
                if node not in preferred_nodes and node.is_available():
                    node.store_with_hint(key, value, hints[0])  # Store for original node
                    acks += 1
                    if acks >= self.w:
                        break

        return acks >= self.w

    def handoff(self, from_node, to_node):
        """Transfer hinted data when node recovers"""
        for key, value in from_node.get_hints(to_node):
            to_node.store(key, value)
            from_node.delete_hint(key, to_node)

3. Conflict Resolution

Last-Writer-Wins (LWW)

class LWWRegister:
    """Last-writer-wins register"""

    def __init__(self):
        self.value = None
        self.timestamp = 0

    def write(self, value: any, timestamp: int):
        if timestamp > self.timestamp:
            self.value = value
            self.timestamp = timestamp

    def merge(self, other: 'LWWRegister'):
        if other.timestamp > self.timestamp:
            self.value = other.value
            self.timestamp = other.timestamp

Multi-Value (Siblings)

class MultiValueRegister:
    """Keep all concurrent values (siblings)"""

    def __init__(self):
        self.values = []  # [(value, vector_clock)]

    def write(self, value: any, clock: dict):
        # Remove values that this write supersedes
        self.values = [
            (v, c) for v, c in self.values
            if not self.happened_before(c, clock)
        ]
        self.values.append((value, clock))

    def read(self):
        if len(self.values) == 1:
            return self.values[0][0]
        # Return all siblings - application must resolve
        return [v for v, _ in self.values]

    def merge(self, other: 'MultiValueRegister'):
        combined = self.values + other.values
        # Remove dominated values
        result = []
        for v, c in combined:
            dominated = False
            for v2, c2 in combined:
                if c != c2 and self.happened_before(c, c2):
                    dominated = True
                    break
            if not dominated:
                result.append((v, c))
        self.values = result

4. CRDTs (Conflict-free Replicated Data Types)

G-Counter (Grow-only Counter)

class GCounter:
    """Grow-only counter CRDT"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.counts = {}  # node_id -> count

    def increment(self, amount: int = 1):
        if self.node_id not in self.counts:
            self.counts[self.node_id] = 0
        self.counts[self.node_id] += amount

    def value(self) -> int:
        return sum(self.counts.values())

    def merge(self, other: 'GCounter'):
        for node, count in other.counts.items():
            self.counts[node] = max(self.counts.get(node, 0), count)

# Example
counter_a = GCounter("A")
counter_b = GCounter("B")

counter_a.increment(5)
counter_b.increment(3)

# Merge
counter_a.merge(counter_b)
counter_b.merge(counter_a)

print(f"A: {counter_a.value()}")  # 8
print(f"B: {counter_b.value()}")  # 8

PN-Counter (Increment and Decrement)

class PNCounter:
    """Positive-Negative counter CRDT"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.p = GCounter(node_id)  # Increments
        self.n = GCounter(node_id)  # Decrements

    def increment(self, amount: int = 1):
        self.p.increment(amount)

    def decrement(self, amount: int = 1):
        self.n.increment(amount)

    def value(self) -> int:
        return self.p.value() - self.n.value()

    def merge(self, other: 'PNCounter'):
        self.p.merge(other.p)
        self.n.merge(other.n)

LWW-Element-Set

class LWWElementSet:
    """Last-Writer-Wins Element Set CRDT"""

    def __init__(self):
        self.add_set = {}    # element -> timestamp
        self.remove_set = {}  # element -> timestamp

    def add(self, element, timestamp: int):
        self.add_set[element] = max(self.add_set.get(element, 0), timestamp)

    def remove(self, element, timestamp: int):
        self.remove_set[element] = max(self.remove_set.get(element, 0), timestamp)

    def contains(self, element) -> bool:
        add_ts = self.add_set.get(element, 0)
        remove_ts = self.remove_set.get(element, 0)
        return add_ts > remove_ts

    def elements(self) -> set:
        return {e for e in self.add_set if self.contains(e)}

    def merge(self, other: 'LWWElementSet'):
        for e, ts in other.add_set.items():
            self.add_set[e] = max(self.add_set.get(e, 0), ts)
        for e, ts in other.remove_set.items():
            self.remove_set[e] = max(self.remove_set.get(e, 0), ts)

5. Anti-Entropy

Read Repair

class ReadRepair:
    """Repair stale replicas during reads"""

    def read_with_repair(self, key: str, replicas: list):
        responses = []
        for replica in replicas:
            value, version = replica.fetch(key)
            responses.append((replica, value, version))

        # Find latest version
        latest_replica, latest_value, latest_version = max(
            responses,
            key=lambda x: x[2]
        )

        # Repair stale replicas
        for replica, value, version in responses:
            if version < latest_version:
                replica.store(key, latest_value, latest_version)

        return latest_value

Merkle Trees

import hashlib

class MerkleTree:
    """Merkle tree for efficient sync"""

    def __init__(self, items: list):
        self.leaves = [self.hash_item(item) for item in items]
        self.tree = self.build_tree(self.leaves)

    def hash_item(self, item) -> str:
        return hashlib.sha256(str(item).encode()).hexdigest()

    def build_tree(self, leaves: list) -> list:
        if len(leaves) == 0:
            return []
        if len(leaves) == 1:
            return leaves

        # Build levels bottom-up
        tree = [leaves]
        current = leaves

        while len(current) > 1:
            next_level = []
            for i in range(0, len(current), 2):
                if i + 1 < len(current):
                    combined = current[i] + current[i+1]
                else:
                    combined = current[i]
                next_level.append(self.hash_item(combined))
            tree.append(next_level)
            current = next_level

        return tree

    def root(self) -> str:
        return self.tree[-1][0] if self.tree else ""

    def diff(self, other: 'MerkleTree') -> list:
        """Find differing items"""
        if self.root() == other.root():
            return []

        # Walk tree to find differences
        diffs = []
        self._diff_level(self.tree, other.tree, len(self.tree) - 1, 0, diffs)
        return diffs

    def _diff_level(self, tree1, tree2, level, index, diffs):
        if level == 0:
            if index < len(tree1[0]) and index < len(tree2[0]):
                if tree1[0][index] != tree2[0][index]:
                    diffs.append(index)
            return

        h1 = tree1[level][index] if index < len(tree1[level]) else None
        h2 = tree2[level][index] if index < len(tree2[level]) else None

        if h1 != h2:
            self._diff_level(tree1, tree2, level - 1, index * 2, diffs)
            self._diff_level(tree1, tree2, level - 1, index * 2 + 1, diffs)

Summary

  • Single-leader is simple but has single point of failure
  • Multi-leader enables multi-datacenter but has conflicts
  • Leaderless uses quorums for availability
  • CRDTs provide automatic conflict resolution
  • Anti-entropy keeps replicas synchronized

Module Complete

This completes the Distributed Systems module! You've learned fundamental concepts for building and reasoning about distributed applications.

← Previous: Consensus | Back to Course Index