Consensus and Coordination

Introduction

Consensus is the fundamental problem in distributed systems: getting multiple nodes to agree on a value. This reading covers consensus protocols (Paxos, Raft), leader election, and coordination services.

Learning Objectives

By the end of this reading, you will be able to:

  • Understand the consensus problem and FLP impossibility
  • Explain the Paxos protocol
  • Implement and understand Raft
  • Use coordination services effectively
  • Apply consensus to practical problems

1. The Consensus Problem

Definition

Consensus requires nodes to agree on a single value:

  1. Agreement: All correct nodes decide the same value
  2. Validity: If all nodes propose v, then decision is v
  3. Termination: All correct nodes eventually decide

FLP Impossibility

Fischer-Lynch-Paterson (1985): Consensus is impossible in an asynchronous system with even one faulty process.

Implications:

  • No algorithm can guarantee consensus in async systems
  • Real systems use timeouts (partial synchrony)
  • Or randomization (probabilistic termination)

2. Paxos

Overview

Paxos is the foundational consensus protocol (Lamport, 1998).

Roles:

  • Proposers: Propose values
  • Acceptors: Vote on proposals
  • Learners: Learn decided value

Phases:

  1. Prepare: Proposer asks acceptors to promise
  2. Accept: Proposer sends value, acceptors accept
  3. Learn: Acceptors inform learners

Single-Decree Paxos

from dataclasses import dataclass
from typing import Optional, Tuple, Dict

@dataclass
class Proposal:
    number: int  # Proposal number (must be unique)
    value: Optional[str] = None

class Acceptor:
    """Paxos acceptor"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.promised: Optional[int] = None  # Highest promised proposal
        self.accepted: Optional[Proposal] = None  # Highest accepted proposal

    def prepare(self, n: int) -> Tuple[bool, Optional[Proposal]]:
        """
        Phase 1b: Respond to prepare request

        Returns (promise, previously_accepted)
        """
        if self.promised is None or n > self.promised:
            self.promised = n
            return True, self.accepted
        return False, None

    def accept(self, proposal: Proposal) -> bool:
        """
        Phase 2b: Accept proposal if allowed

        Returns whether accepted
        """
        if self.promised is None or proposal.number >= self.promised:
            self.promised = proposal.number
            self.accepted = proposal
            return True
        return False

class Proposer:
    """Paxos proposer"""

    def __init__(self, node_id: str, acceptors: list):
        self.node_id = node_id
        self.acceptors = acceptors
        self.proposal_number = 0

    def next_proposal_number(self) -> int:
        """Generate unique proposal number"""
        # In practice, combine node_id with counter
        self.proposal_number += 1
        return self.proposal_number * 100 + hash(self.node_id) % 100

    def propose(self, value: str) -> Optional[str]:
        """Run Paxos to propose a value"""
        n = self.next_proposal_number()

        # Phase 1: Prepare
        promises = []
        for acceptor in self.acceptors:
            success, prev_accepted = acceptor.prepare(n)
            if success:
                promises.append((acceptor, prev_accepted))

        # Need majority
        if len(promises) <= len(self.acceptors) // 2:
            return None  # Failed to get majority

        # Choose value: use highest previously accepted, or our value
        max_accepted = None
        for _, prev in promises:
            if prev is not None:
                if max_accepted is None or prev.number > max_accepted.number:
                    max_accepted = prev

        if max_accepted is not None:
            value = max_accepted.value  # Must use previously accepted value!

        proposal = Proposal(n, value)

        # Phase 2: Accept
        accepts = 0
        for acceptor, _ in promises:
            if acceptor.accept(proposal):
                accepts += 1

        # Need majority
        if accepts > len(self.acceptors) // 2:
            return value  # Consensus reached!

        return None

# Example
acceptors = [Acceptor(f"A{i}") for i in range(5)]
proposer1 = Proposer("P1", acceptors)
proposer2 = Proposer("P2", acceptors)

# Concurrent proposals
result1 = proposer1.propose("value_A")
result2 = proposer2.propose("value_B")

print(f"Proposer 1 result: {result1}")
print(f"Proposer 2 result: {result2}")
# They should agree (though one might fail and retry)

Multi-Paxos

For a sequence of decisions (log replication):

  1. Elect a stable leader
  2. Leader proposes values for consecutive slots
  3. Skip Phase 1 for most operations
  4. Handle leader failures with new election

3. Raft

Overview

Raft (2014) is designed to be understandable. It separates:

  • Leader Election: Choose a leader
  • Log Replication: Leader replicates commands
  • Safety: Ensure committed entries aren't lost

Raft Implementation

from enum import Enum
from dataclasses import dataclass, field
from typing import List, Optional, Dict
import random
import time

class NodeState(Enum):
    FOLLOWER = "follower"
    CANDIDATE = "candidate"
    LEADER = "leader"

@dataclass
class LogEntry:
    term: int
    command: str
    index: int

@dataclass
class RaftNode:
    """Simplified Raft node"""

    node_id: str
    peers: List[str]

    # Persistent state
    current_term: int = 0
    voted_for: Optional[str] = None
    log: List[LogEntry] = field(default_factory=list)

    # Volatile state
    commit_index: int = 0
    last_applied: int = 0

    # Leader state
    next_index: Dict[str, int] = field(default_factory=dict)
    match_index: Dict[str, int] = field(default_factory=dict)

    state: NodeState = NodeState.FOLLOWER
    leader_id: Optional[str] = None

    def start_election(self):
        """Become candidate and request votes"""
        self.state = NodeState.CANDIDATE
        self.current_term += 1
        self.voted_for = self.node_id
        votes = 1  # Vote for self

        last_log_index = len(self.log)
        last_log_term = self.log[-1].term if self.log else 0

        for peer in self.peers:
            # In practice, send RequestVote RPC
            vote_granted = self.request_vote(
                peer,
                self.current_term,
                self.node_id,
                last_log_index,
                last_log_term
            )
            if vote_granted:
                votes += 1

        if votes > (len(self.peers) + 1) // 2:
            self.become_leader()
        else:
            self.state = NodeState.FOLLOWER

    def become_leader(self):
        """Initialize leader state"""
        self.state = NodeState.LEADER
        self.leader_id = self.node_id

        # Initialize next_index and match_index
        next_idx = len(self.log) + 1
        for peer in self.peers:
            self.next_index[peer] = next_idx
            self.match_index[peer] = 0

        # Send initial heartbeats
        self.send_heartbeats()

    def request_vote(self, peer, term, candidate_id, last_log_index, last_log_term) -> bool:
        """Handle incoming vote request (simplified)"""
        # In practice, this would be an RPC to the peer
        # Here we simulate the receiver's logic

        # Reject if term is stale
        if term < self.current_term:
            return False

        # Update term if needed
        if term > self.current_term:
            self.current_term = term
            self.voted_for = None
            self.state = NodeState.FOLLOWER

        # Check if we can vote
        if self.voted_for is None or self.voted_for == candidate_id:
            # Check if candidate's log is at least as up-to-date
            my_last_term = self.log[-1].term if self.log else 0
            my_last_index = len(self.log)

            if (last_log_term > my_last_term or
                (last_log_term == my_last_term and last_log_index >= my_last_index)):
                self.voted_for = candidate_id
                return True

        return False

    def append_entries(self, term, leader_id, prev_log_index, prev_log_term,
                       entries: List[LogEntry], leader_commit) -> bool:
        """Handle AppendEntries RPC"""
        # Reject if term is stale
        if term < self.current_term:
            return False

        # Update term if needed
        if term > self.current_term:
            self.current_term = term
            self.voted_for = None

        self.state = NodeState.FOLLOWER
        self.leader_id = leader_id

        # Check log consistency
        if prev_log_index > 0:
            if len(self.log) < prev_log_index:
                return False
            if self.log[prev_log_index - 1].term != prev_log_term:
                return False

        # Append new entries
        idx = prev_log_index
        for entry in entries:
            idx += 1
            if len(self.log) >= idx:
                if self.log[idx - 1].term != entry.term:
                    # Conflict - truncate
                    self.log = self.log[:idx - 1]
                    self.log.append(entry)
            else:
                self.log.append(entry)

        # Update commit index
        if leader_commit > self.commit_index:
            self.commit_index = min(leader_commit, len(self.log))

        return True

    def send_heartbeats(self):
        """Leader sends heartbeats to all followers"""
        for peer in self.peers:
            prev_log_index = self.next_index[peer] - 1
            prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0

            # Get entries to send
            entries = self.log[prev_log_index:]

            # In practice, send AppendEntries RPC
            success = self.append_entries_to_peer(
                peer,
                self.current_term,
                self.node_id,
                prev_log_index,
                prev_log_term,
                entries,
                self.commit_index
            )

            if success:
                self.next_index[peer] = len(self.log) + 1
                self.match_index[peer] = len(self.log)
            else:
                # Decrement next_index and retry
                self.next_index[peer] = max(1, self.next_index[peer] - 1)

    def client_request(self, command: str) -> bool:
        """Handle client request (leader only)"""
        if self.state != NodeState.LEADER:
            return False

        # Append to local log
        entry = LogEntry(
            term=self.current_term,
            command=command,
            index=len(self.log) + 1
        )
        self.log.append(entry)

        # Replicate to followers
        self.send_heartbeats()

        # Check if committed
        return self.try_commit()

    def try_commit(self) -> bool:
        """Check if we can advance commit index"""
        # Find highest index replicated to majority
        for n in range(len(self.log), self.commit_index, -1):
            if self.log[n-1].term != self.current_term:
                continue

            replication_count = 1  # Leader has it
            for peer in self.peers:
                if self.match_index.get(peer, 0) >= n:
                    replication_count += 1

            if replication_count > (len(self.peers) + 1) // 2:
                self.commit_index = n
                return True

        return False

Raft Guarantees

  1. Election Safety: At most one leader per term
  2. Leader Append-Only: Leader never overwrites log entries
  3. Log Matching: If two logs have entry with same index/term, all previous entries are identical
  4. Leader Completeness: Committed entries appear in future leaders' logs
  5. State Machine Safety: Same commands applied in same order

4. Leader Election

Bully Algorithm

Simple leader election for synchronous systems:

class BullyNode:
    """Node using Bully algorithm"""

    def __init__(self, node_id: int, all_nodes: list):
        self.node_id = node_id
        self.all_nodes = all_nodes
        self.leader = None
        self.alive = True

    def start_election(self):
        """Start an election"""
        higher_nodes = [n for n in self.all_nodes if n > self.node_id]

        if not higher_nodes:
            # I am the highest - become leader
            self.become_leader()
            return

        # Send election messages to higher nodes
        responses = []
        for node in higher_nodes:
            response = self.send_election(node)
            if response:
                responses.append(node)

        if not responses:
            # No higher node responded - become leader
            self.become_leader()
        # else: wait for coordinator message

    def become_leader(self):
        """Announce leadership"""
        self.leader = self.node_id
        for node in self.all_nodes:
            if node != self.node_id:
                self.send_coordinator(node)

Ring-Based Election

Nodes arranged in a logical ring:

class RingNode:
    """Node in ring-based election"""

    def __init__(self, node_id: int, successor: 'RingNode'):
        self.node_id = node_id
        self.successor = successor
        self.leader = None

    def start_election(self):
        """Start election by sending message around ring"""
        self.send_election_msg([self.node_id])

    def receive_election(self, candidates: list):
        """Handle election message"""
        if self.node_id in candidates:
            # Message came back - highest wins
            self.leader = max(candidates)
            self.send_coordinator(self.leader)
        else:
            # Add self and forward
            candidates.append(self.node_id)
            self.successor.receive_election(candidates)

5. Coordination Services

ZooKeeper

A coordination service providing:

  • Distributed configuration
  • Naming registry
  • Group membership
  • Leader election
  • Distributed locks
# ZooKeeper-style primitives

class ZKClient:
    """Simplified ZooKeeper client"""

    def __init__(self, servers):
        self.servers = servers

    def create(self, path: str, data: bytes, ephemeral=False, sequential=False):
        """Create a znode"""
        pass

    def get(self, path: str) -> bytes:
        """Get znode data"""
        pass

    def set(self, path: str, data: bytes, version: int):
        """Set znode data (with optimistic locking)"""
        pass

    def delete(self, path: str, version: int):
        """Delete znode"""
        pass

    def exists(self, path: str, watch=None) -> bool:
        """Check if znode exists"""
        pass

    def get_children(self, path: str, watch=None) -> list:
        """Get child znodes"""
        pass

# Leader election using ZooKeeper
def leader_election(zk: ZKClient, path: str, node_id: str):
    """Simple leader election"""
    # Create ephemeral sequential node
    my_path = zk.create(
        f"{path}/candidate_",
        node_id.encode(),
        ephemeral=True,
        sequential=True
    )

    while True:
        children = sorted(zk.get_children(path))
        my_name = my_path.split("/")[-1]

        if children[0] == my_name:
            # I am leader!
            return True

        # Watch the node before me
        my_index = children.index(my_name)
        watch_path = f"{path}/{children[my_index - 1]}"

        if not zk.exists(watch_path, watch=lambda: None):
            # Node deleted, retry
            continue

        # Wait for watch to fire
        wait_for_watch()

# Distributed lock using ZooKeeper
class ZKLock:
    def __init__(self, zk: ZKClient, path: str):
        self.zk = zk
        self.path = path
        self.lock_node = None

    def acquire(self):
        """Acquire the lock"""
        self.lock_node = self.zk.create(
            f"{self.path}/lock_",
            b"",
            ephemeral=True,
            sequential=True
        )

        while True:
            children = sorted(self.zk.get_children(self.path))
            my_name = self.lock_node.split("/")[-1]

            if children[0] == my_name:
                return  # Lock acquired

            # Watch predecessor
            my_index = children.index(my_name)
            watch_path = f"{self.path}/{children[my_index - 1]}"
            self.zk.exists(watch_path, watch=lambda: None)
            wait_for_watch()

    def release(self):
        """Release the lock"""
        if self.lock_node:
            self.zk.delete(self.lock_node, -1)
            self.lock_node = None

Exercises

Basic

  1. Explain why Paxos needs a majority quorum.

  2. Draw the message flow for a Raft leader election.

  3. Implement a simple distributed lock using a coordination service.

Intermediate

  1. Implement Multi-Paxos with log replication.

  2. Add membership changes to a Raft implementation.

  3. Implement a distributed configuration service.

Advanced

  1. Implement a Byzantine fault-tolerant consensus protocol (PBFT).

  2. Analyze the performance of Raft vs Paxos under different failure scenarios.

  3. Design a geo-distributed consensus system.


Summary

  • Consensus is about getting distributed nodes to agree
  • Paxos is the theoretical foundation; Raft is more practical
  • Leader election simplifies many distributed protocols
  • Coordination services (ZooKeeper, etcd) provide primitives for building distributed systems

Next Reading

Replication and Consistency →