Replication and Consistency

Introduction

Replication keeps copies of data on multiple nodes for fault tolerance, lower latency, and higher throughput. This reading covers replication strategies, consistency protocols, and conflict resolution.

Learning Objectives

By the end of this reading, you will be able to:

  • Understand different replication strategies
  • Implement quorum-based consistency
  • Handle conflicts in replicated data
  • Design anti-entropy mechanisms
  • Apply CRDTs for conflict-free replication

1. Replication Strategies

Single-Leader (Primary-Backup)

One node handles all writes; followers replicate asynchronously or synchronously.

class SingleLeaderReplication:
    """Primary-backup replication"""

    def __init__(self, node_id: str, is_leader: bool):
        self.node_id = node_id
        self.is_leader = is_leader
        self.data = {}
        self.followers = []
        self.leader = None

    def write(self, key: str, value: any) -> bool:
        if not self.is_leader:
            # Forward to leader
            return self.leader.write(key, value)

        # Leader handles write
        self.data[key] = value

        # Replicate to followers
        for follower in self.followers:
            follower.replicate(key, value)

        return True

    def read(self, key: str) -> any:
        # Can read from any node (may be stale)
        return self.data.get(key)

    def replicate(self, key: str, value: any):
        """Receive replication from leader"""
        self.data[key] = value

Multi-Leader

Multiple nodes can accept writes; used for multi-datacenter deployments.

class MultiLeaderReplication:
    """Multi-leader replication with conflict detection"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.data = {}  # key -> (value, vector_clock)
        self.peers = []
        self.clock = VectorClock(node_id, [])

    def write(self, key: str, value: any):
        self.clock.tick()
        self.data[key] = (value, self.clock.clock.copy())

        # Async replicate to peers
        for peer in self.peers:
            peer.receive_update(key, value, self.clock.clock.copy(), self.node_id)

    def receive_update(self, key: str, value: any, sender_clock: dict, sender_id: str):
        if key not in self.data:
            self.data[key] = (value, sender_clock)
            self.clock.receive(sender_clock)
        else:
            existing_value, existing_clock = self.data[key]
            # Detect conflict
            if self.concurrent(existing_clock, sender_clock):
                # Conflict! Use resolution strategy
                resolved = self.resolve_conflict(existing_value, value, existing_clock, sender_clock)
                self.data[key] = resolved
            else:
                # No conflict - use newer
                if self.happened_after(sender_clock, existing_clock):
                    self.data[key] = (value, sender_clock)
            self.clock.receive(sender_clock)

    def resolve_conflict(self, v1, v2, c1, c2):
        """Last-writer-wins or custom resolution"""
        # Simple: use higher timestamp sum
        if sum(c1.values()) > sum(c2.values()):
            return (v1, c1)
        return (v2, c2)

Leaderless (Dynamo-Style)

Any node can accept writes; uses quorums for consistency.

class LeaderlessReplication:
    """Dynamo-style leaderless replication"""

    def __init__(self, node_id: str, replicas: list, n: int, w: int, r: int):
        """
        n: number of replicas
        w: write quorum
        r: read quorum
        For consistency: w + r > n
        """
        self.node_id = node_id
        self.replicas = replicas
        self.n = n
        self.w = w
        self.r = r
        self.data = {}  # key -> (value, version)

    def write(self, key: str, value: any) -> bool:
        # Write to all replicas, wait for w acknowledgments
        version = int(time.time() * 1000)

        acks = 0
        for replica in self.replicas:
            success = replica.store(key, value, version)
            if success:
                acks += 1
                if acks >= self.w:
                    return True

        return acks >= self.w

    def read(self, key: str) -> any:
        # Read from all replicas, wait for r responses
        responses = []
        for replica in self.replicas:
            result = replica.fetch(key)
            if result is not None:
                responses.append(result)
                if len(responses) >= self.r:
                    break

        if not responses:
            return None

        # Return highest version
        best = max(responses, key=lambda x: x[1])
        return best[0]

    def store(self, key: str, value: any, version: int) -> bool:
        """Store value if version is newer"""
        if key not in self.data or self.data[key][1] < version:
            self.data[key] = (value, version)
            return True
        return False

    def fetch(self, key: str):
        return self.data.get(key)

2. Quorums

Read and Write Quorums

class QuorumSystem:
    """Configurable quorum system"""

    def __init__(self, n: int):
        self.n = n

    def strong_consistency(self):
        """w + r > n ensures overlap"""
        w = self.n // 2 + 1
        r = self.n // 2 + 1
        return w, r

    def write_heavy(self):
        """Fast writes, slow reads"""
        w = 1
        r = self.n
        return w, r

    def read_heavy(self):
        """Fast reads, slow writes"""
        w = self.n
        r = 1
        return w, r

    def balanced(self):
        """Majority quorum"""
        w = r = self.n // 2 + 1
        return w, r

# Example
qs = QuorumSystem(5)
print(f"Strong consistency: W={qs.strong_consistency()}")  # W=3, R=3
print(f"Write heavy: W={qs.write_heavy()}")  # W=1, R=5
print(f"Read heavy: W={qs.read_heavy()}")  # W=5, R=1

Sloppy Quorums

Allow writes to "nearby" nodes when preferred nodes are unavailable:

class SloppyQuorum:
    """Sloppy quorum with hinted handoff"""

    def __init__(self, all_nodes: list, n: int, w: int):
        self.all_nodes = all_nodes
        self.n = n
        self.w = w
        self.hints = {}  # node -> list of hinted writes

    def write(self, key: str, value: any, preferred_nodes: list):
        """Write with sloppy quorum"""
        acks = 0
        hints = []

        # Try preferred nodes first
        for node in preferred_nodes:
            if node.is_available():
                node.store(key, value)
                acks += 1
            else:
                hints.append(node)

            if acks >= self.w:
                break

        # If not enough, use other nodes
        if acks < self.w:
            for node in self.all_nodes:
                if node not in preferred_nodes and node.is_available():
                    node.store_with_hint(key, value, hints[0])  # Store for original node
                    acks += 1
                    if acks >= self.w:
                        break

        return acks >= self.w

    def handoff(self, from_node, to_node):
        """Transfer hinted data when node recovers"""
        for key, value in from_node.get_hints(to_node):
            to_node.store(key, value)
            from_node.delete_hint(key, to_node)

3. Conflict Resolution

Last-Writer-Wins (LWW)

class LWWRegister:
    """Last-writer-wins register"""

    def __init__(self):
        self.value = None
        self.timestamp = 0

    def write(self, value: any, timestamp: int):
        if timestamp > self.timestamp:
            self.value = value
            self.timestamp = timestamp

    def merge(self, other: 'LWWRegister'):
        if other.timestamp > self.timestamp:
            self.value = other.value
            self.timestamp = other.timestamp

Multi-Value (Siblings)

class MultiValueRegister:
    """Keep all concurrent values (siblings)"""

    def __init__(self):
        self.values = []  # [(value, vector_clock)]

    def write(self, value: any, clock: dict):
        # Remove values that this write supersedes
        self.values = [
            (v, c) for v, c in self.values
            if not self.happened_before(c, clock)
        ]
        self.values.append((value, clock))

    def read(self):
        if len(self.values) == 1:
            return self.values[0][0]
        # Return all siblings - application must resolve
        return [v for v, _ in self.values]

    def merge(self, other: 'MultiValueRegister'):
        combined = self.values + other.values
        # Remove dominated values
        result = []
        for v, c in combined:
            dominated = False
            for v2, c2 in combined:
                if c != c2 and self.happened_before(c, c2):
                    dominated = True
                    break
            if not dominated:
                result.append((v, c))
        self.values = result

4. CRDTs (Conflict-free Replicated Data Types)

G-Counter (Grow-only Counter)

class GCounter:
    """Grow-only counter CRDT"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.counts = {}  # node_id -> count

    def increment(self, amount: int = 1):
        if self.node_id not in self.counts:
            self.counts[self.node_id] = 0
        self.counts[self.node_id] += amount

    def value(self) -> int:
        return sum(self.counts.values())

    def merge(self, other: 'GCounter'):
        for node, count in other.counts.items():
            self.counts[node] = max(self.counts.get(node, 0), count)

# Example
counter_a = GCounter("A")
counter_b = GCounter("B")

counter_a.increment(5)
counter_b.increment(3)

# Merge
counter_a.merge(counter_b)
counter_b.merge(counter_a)

print(f"A: {counter_a.value()}")  # 8
print(f"B: {counter_b.value()}")  # 8

PN-Counter (Increment and Decrement)

class PNCounter:
    """Positive-Negative counter CRDT"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.p = GCounter(node_id)  # Increments
        self.n = GCounter(node_id)  # Decrements

    def increment(self, amount: int = 1):
        self.p.increment(amount)

    def decrement(self, amount: int = 1):
        self.n.increment(amount)

    def value(self) -> int:
        return self.p.value() - self.n.value()

    def merge(self, other: 'PNCounter'):
        self.p.merge(other.p)
        self.n.merge(other.n)

LWW-Element-Set

class LWWElementSet:
    """Last-Writer-Wins Element Set CRDT"""

    def __init__(self):
        self.add_set = {}    # element -> timestamp
        self.remove_set = {}  # element -> timestamp

    def add(self, element, timestamp: int):
        self.add_set[element] = max(self.add_set.get(element, 0), timestamp)

    def remove(self, element, timestamp: int):
        self.remove_set[element] = max(self.remove_set.get(element, 0), timestamp)

    def contains(self, element) -> bool:
        add_ts = self.add_set.get(element, 0)
        remove_ts = self.remove_set.get(element, 0)
        return add_ts > remove_ts

    def elements(self) -> set:
        return {e for e in self.add_set if self.contains(e)}

    def merge(self, other: 'LWWElementSet'):
        for e, ts in other.add_set.items():
            self.add_set[e] = max(self.add_set.get(e, 0), ts)
        for e, ts in other.remove_set.items():
            self.remove_set[e] = max(self.remove_set.get(e, 0), ts)

5. Anti-Entropy

Read Repair

class ReadRepair:
    """Repair stale replicas during reads"""

    def read_with_repair(self, key: str, replicas: list):
        responses = []
        for replica in replicas:
            value, version = replica.fetch(key)
            responses.append((replica, value, version))

        # Find latest version
        latest_replica, latest_value, latest_version = max(
            responses,
            key=lambda x: x[2]
        )

        # Repair stale replicas
        for replica, value, version in responses:
            if version < latest_version:
                replica.store(key, latest_value, latest_version)

        return latest_value

Merkle Trees

import hashlib

class MerkleTree:
    """Merkle tree for efficient sync"""

    def __init__(self, items: list):
        self.leaves = [self.hash_item(item) for item in items]
        self.tree = self.build_tree(self.leaves)

    def hash_item(self, item) -> str:
        return hashlib.sha256(str(item).encode()).hexdigest()

    def build_tree(self, leaves: list) -> list:
        if len(leaves) == 0:
            return []
        if len(leaves) == 1:
            return leaves

        # Build levels bottom-up
        tree = [leaves]
        current = leaves

        while len(current) > 1:
            next_level = []
            for i in range(0, len(current), 2):
                if i + 1 < len(current):
                    combined = current[i] + current[i+1]
                else:
                    combined = current[i]
                next_level.append(self.hash_item(combined))
            tree.append(next_level)
            current = next_level

        return tree

    def root(self) -> str:
        return self.tree[-1][0] if self.tree else ""

    def diff(self, other: 'MerkleTree') -> list:
        """Find differing items"""
        if self.root() == other.root():
            return []

        # Walk tree to find differences
        diffs = []
        self._diff_level(self.tree, other.tree, len(self.tree) - 1, 0, diffs)
        return diffs

    def _diff_level(self, tree1, tree2, level, index, diffs):
        if level == 0:
            if index < len(tree1[0]) and index < len(tree2[0]):
                if tree1[0][index] != tree2[0][index]:
                    diffs.append(index)
            return

        h1 = tree1[level][index] if index < len(tree1[level]) else None
        h2 = tree2[level][index] if index < len(tree2[level]) else None

        if h1 != h2:
            self._diff_level(tree1, tree2, level - 1, index * 2, diffs)
            self._diff_level(tree1, tree2, level - 1, index * 2 + 1, diffs)

Summary

  • Single-leader is simple but has single point of failure
  • Multi-leader enables multi-datacenter but has conflicts
  • Leaderless uses quorums for availability
  • CRDTs provide automatic conflict resolution
  • Anti-entropy keeps replicas synchronized

Module Complete

This completes the Distributed Systems module! You've learned fundamental concepts for building and reasoning about distributed applications.

← Previous: Consensus | Back to Course Index