Consensus and Coordination
Introduction
Consensus is the fundamental problem in distributed systems: getting multiple nodes to agree on a value. This reading covers consensus protocols (Paxos, Raft), leader election, and coordination services.
Learning Objectives
By the end of this reading, you will be able to:
- Understand the consensus problem and FLP impossibility
- Explain the Paxos protocol
- Implement and understand Raft
- Use coordination services effectively
- Apply consensus to practical problems
1. The Consensus Problem
Definition
Consensus requires nodes to agree on a single value:
- Agreement: All correct nodes decide the same value
- Validity: If all nodes propose v, then decision is v
- Termination: All correct nodes eventually decide
FLP Impossibility
Fischer-Lynch-Paterson (1985): Consensus is impossible in an asynchronous system with even one faulty process.
Implications:
- No algorithm can guarantee consensus in async systems
- Real systems use timeouts (partial synchrony)
- Or randomization (probabilistic termination)
2. Paxos
Overview
Paxos is the foundational consensus protocol (Lamport, 1998).
Roles:
- Proposers: Propose values
- Acceptors: Vote on proposals
- Learners: Learn decided value
Phases:
- Prepare: Proposer asks acceptors to promise
- Accept: Proposer sends value, acceptors accept
- Learn: Acceptors inform learners
Single-Decree Paxos
from dataclasses import dataclass
from typing import Optional, Tuple, Dict
@dataclass
class Proposal:
number: int # Proposal number (must be unique)
value: Optional[str] = None
class Acceptor:
"""Paxos acceptor"""
def __init__(self, node_id: str):
self.node_id = node_id
self.promised: Optional[int] = None # Highest promised proposal
self.accepted: Optional[Proposal] = None # Highest accepted proposal
def prepare(self, n: int) -> Tuple[bool, Optional[Proposal]]:
"""
Phase 1b: Respond to prepare request
Returns (promise, previously_accepted)
"""
if self.promised is None or n > self.promised:
self.promised = n
return True, self.accepted
return False, None
def accept(self, proposal: Proposal) -> bool:
"""
Phase 2b: Accept proposal if allowed
Returns whether accepted
"""
if self.promised is None or proposal.number >= self.promised:
self.promised = proposal.number
self.accepted = proposal
return True
return False
class Proposer:
"""Paxos proposer"""
def __init__(self, node_id: str, acceptors: list):
self.node_id = node_id
self.acceptors = acceptors
self.proposal_number = 0
def next_proposal_number(self) -> int:
"""Generate unique proposal number"""
# In practice, combine node_id with counter
self.proposal_number += 1
return self.proposal_number * 100 + hash(self.node_id) % 100
def propose(self, value: str) -> Optional[str]:
"""Run Paxos to propose a value"""
n = self.next_proposal_number()
# Phase 1: Prepare
promises = []
for acceptor in self.acceptors:
success, prev_accepted = acceptor.prepare(n)
if success:
promises.append((acceptor, prev_accepted))
# Need majority
if len(promises) <= len(self.acceptors) // 2:
return None # Failed to get majority
# Choose value: use highest previously accepted, or our value
max_accepted = None
for _, prev in promises:
if prev is not None:
if max_accepted is None or prev.number > max_accepted.number:
max_accepted = prev
if max_accepted is not None:
value = max_accepted.value # Must use previously accepted value!
proposal = Proposal(n, value)
# Phase 2: Accept
accepts = 0
for acceptor, _ in promises:
if acceptor.accept(proposal):
accepts += 1
# Need majority
if accepts > len(self.acceptors) // 2:
return value # Consensus reached!
return None
# Example
acceptors = [Acceptor(f"A{i}") for i in range(5)]
proposer1 = Proposer("P1", acceptors)
proposer2 = Proposer("P2", acceptors)
# Concurrent proposals
result1 = proposer1.propose("value_A")
result2 = proposer2.propose("value_B")
print(f"Proposer 1 result: {result1}")
print(f"Proposer 2 result: {result2}")
# They should agree (though one might fail and retry)
Multi-Paxos
For a sequence of decisions (log replication):
- Elect a stable leader
- Leader proposes values for consecutive slots
- Skip Phase 1 for most operations
- Handle leader failures with new election
3. Raft
Overview
Raft (2014) is designed to be understandable. It separates:
- Leader Election: Choose a leader
- Log Replication: Leader replicates commands
- Safety: Ensure committed entries aren't lost
Raft Implementation
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Optional, Dict
import random
import time
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
term: int
command: str
index: int
@dataclass
class RaftNode:
"""Simplified Raft node"""
node_id: str
peers: List[str]
# Persistent state
current_term: int = 0
voted_for: Optional[str] = None
log: List[LogEntry] = field(default_factory=list)
# Volatile state
commit_index: int = 0
last_applied: int = 0
# Leader state
next_index: Dict[str, int] = field(default_factory=dict)
match_index: Dict[str, int] = field(default_factory=dict)
state: NodeState = NodeState.FOLLOWER
leader_id: Optional[str] = None
def start_election(self):
"""Become candidate and request votes"""
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
votes = 1 # Vote for self
last_log_index = len(self.log)
last_log_term = self.log[-1].term if self.log else 0
for peer in self.peers:
# In practice, send RequestVote RPC
vote_granted = self.request_vote(
peer,
self.current_term,
self.node_id,
last_log_index,
last_log_term
)
if vote_granted:
votes += 1
if votes > (len(self.peers) + 1) // 2:
self.become_leader()
else:
self.state = NodeState.FOLLOWER
def become_leader(self):
"""Initialize leader state"""
self.state = NodeState.LEADER
self.leader_id = self.node_id
# Initialize next_index and match_index
next_idx = len(self.log) + 1
for peer in self.peers:
self.next_index[peer] = next_idx
self.match_index[peer] = 0
# Send initial heartbeats
self.send_heartbeats()
def request_vote(self, peer, term, candidate_id, last_log_index, last_log_term) -> bool:
"""Handle incoming vote request (simplified)"""
# In practice, this would be an RPC to the peer
# Here we simulate the receiver's logic
# Reject if term is stale
if term < self.current_term:
return False
# Update term if needed
if term > self.current_term:
self.current_term = term
self.voted_for = None
self.state = NodeState.FOLLOWER
# Check if we can vote
if self.voted_for is None or self.voted_for == candidate_id:
# Check if candidate's log is at least as up-to-date
my_last_term = self.log[-1].term if self.log else 0
my_last_index = len(self.log)
if (last_log_term > my_last_term or
(last_log_term == my_last_term and last_log_index >= my_last_index)):
self.voted_for = candidate_id
return True
return False
def append_entries(self, term, leader_id, prev_log_index, prev_log_term,
entries: List[LogEntry], leader_commit) -> bool:
"""Handle AppendEntries RPC"""
# Reject if term is stale
if term < self.current_term:
return False
# Update term if needed
if term > self.current_term:
self.current_term = term
self.voted_for = None
self.state = NodeState.FOLLOWER
self.leader_id = leader_id
# Check log consistency
if prev_log_index > 0:
if len(self.log) < prev_log_index:
return False
if self.log[prev_log_index - 1].term != prev_log_term:
return False
# Append new entries
idx = prev_log_index
for entry in entries:
idx += 1
if len(self.log) >= idx:
if self.log[idx - 1].term != entry.term:
# Conflict - truncate
self.log = self.log[:idx - 1]
self.log.append(entry)
else:
self.log.append(entry)
# Update commit index
if leader_commit > self.commit_index:
self.commit_index = min(leader_commit, len(self.log))
return True
def send_heartbeats(self):
"""Leader sends heartbeats to all followers"""
for peer in self.peers:
prev_log_index = self.next_index[peer] - 1
prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0
# Get entries to send
entries = self.log[prev_log_index:]
# In practice, send AppendEntries RPC
success = self.append_entries_to_peer(
peer,
self.current_term,
self.node_id,
prev_log_index,
prev_log_term,
entries,
self.commit_index
)
if success:
self.next_index[peer] = len(self.log) + 1
self.match_index[peer] = len(self.log)
else:
# Decrement next_index and retry
self.next_index[peer] = max(1, self.next_index[peer] - 1)
def client_request(self, command: str) -> bool:
"""Handle client request (leader only)"""
if self.state != NodeState.LEADER:
return False
# Append to local log
entry = LogEntry(
term=self.current_term,
command=command,
index=len(self.log) + 1
)
self.log.append(entry)
# Replicate to followers
self.send_heartbeats()
# Check if committed
return self.try_commit()
def try_commit(self) -> bool:
"""Check if we can advance commit index"""
# Find highest index replicated to majority
for n in range(len(self.log), self.commit_index, -1):
if self.log[n-1].term != self.current_term:
continue
replication_count = 1 # Leader has it
for peer in self.peers:
if self.match_index.get(peer, 0) >= n:
replication_count += 1
if replication_count > (len(self.peers) + 1) // 2:
self.commit_index = n
return True
return False
Raft Guarantees
- Election Safety: At most one leader per term
- Leader Append-Only: Leader never overwrites log entries
- Log Matching: If two logs have entry with same index/term, all previous entries are identical
- Leader Completeness: Committed entries appear in future leaders' logs
- State Machine Safety: Same commands applied in same order
4. Leader Election
Bully Algorithm
Simple leader election for synchronous systems:
class BullyNode:
"""Node using Bully algorithm"""
def __init__(self, node_id: int, all_nodes: list):
self.node_id = node_id
self.all_nodes = all_nodes
self.leader = None
self.alive = True
def start_election(self):
"""Start an election"""
higher_nodes = [n for n in self.all_nodes if n > self.node_id]
if not higher_nodes:
# I am the highest - become leader
self.become_leader()
return
# Send election messages to higher nodes
responses = []
for node in higher_nodes:
response = self.send_election(node)
if response:
responses.append(node)
if not responses:
# No higher node responded - become leader
self.become_leader()
# else: wait for coordinator message
def become_leader(self):
"""Announce leadership"""
self.leader = self.node_id
for node in self.all_nodes:
if node != self.node_id:
self.send_coordinator(node)
Ring-Based Election
Nodes arranged in a logical ring:
class RingNode:
"""Node in ring-based election"""
def __init__(self, node_id: int, successor: 'RingNode'):
self.node_id = node_id
self.successor = successor
self.leader = None
def start_election(self):
"""Start election by sending message around ring"""
self.send_election_msg([self.node_id])
def receive_election(self, candidates: list):
"""Handle election message"""
if self.node_id in candidates:
# Message came back - highest wins
self.leader = max(candidates)
self.send_coordinator(self.leader)
else:
# Add self and forward
candidates.append(self.node_id)
self.successor.receive_election(candidates)
5. Coordination Services
ZooKeeper
A coordination service providing:
- Distributed configuration
- Naming registry
- Group membership
- Leader election
- Distributed locks
# ZooKeeper-style primitives
class ZKClient:
"""Simplified ZooKeeper client"""
def __init__(self, servers):
self.servers = servers
def create(self, path: str, data: bytes, ephemeral=False, sequential=False):
"""Create a znode"""
pass
def get(self, path: str) -> bytes:
"""Get znode data"""
pass
def set(self, path: str, data: bytes, version: int):
"""Set znode data (with optimistic locking)"""
pass
def delete(self, path: str, version: int):
"""Delete znode"""
pass
def exists(self, path: str, watch=None) -> bool:
"""Check if znode exists"""
pass
def get_children(self, path: str, watch=None) -> list:
"""Get child znodes"""
pass
# Leader election using ZooKeeper
def leader_election(zk: ZKClient, path: str, node_id: str):
"""Simple leader election"""
# Create ephemeral sequential node
my_path = zk.create(
f"{path}/candidate_",
node_id.encode(),
ephemeral=True,
sequential=True
)
while True:
children = sorted(zk.get_children(path))
my_name = my_path.split("/")[-1]
if children[0] == my_name:
# I am leader!
return True
# Watch the node before me
my_index = children.index(my_name)
watch_path = f"{path}/{children[my_index - 1]}"
if not zk.exists(watch_path, watch=lambda: None):
# Node deleted, retry
continue
# Wait for watch to fire
wait_for_watch()
# Distributed lock using ZooKeeper
class ZKLock:
def __init__(self, zk: ZKClient, path: str):
self.zk = zk
self.path = path
self.lock_node = None
def acquire(self):
"""Acquire the lock"""
self.lock_node = self.zk.create(
f"{self.path}/lock_",
b"",
ephemeral=True,
sequential=True
)
while True:
children = sorted(self.zk.get_children(self.path))
my_name = self.lock_node.split("/")[-1]
if children[0] == my_name:
return # Lock acquired
# Watch predecessor
my_index = children.index(my_name)
watch_path = f"{self.path}/{children[my_index - 1]}"
self.zk.exists(watch_path, watch=lambda: None)
wait_for_watch()
def release(self):
"""Release the lock"""
if self.lock_node:
self.zk.delete(self.lock_node, -1)
self.lock_node = None
Exercises
Basic
Explain why Paxos needs a majority quorum.
Draw the message flow for a Raft leader election.
Implement a simple distributed lock using a coordination service.
Intermediate
Implement Multi-Paxos with log replication.
Add membership changes to a Raft implementation.
Implement a distributed configuration service.
Advanced
Implement a Byzantine fault-tolerant consensus protocol (PBFT).
Analyze the performance of Raft vs Paxos under different failure scenarios.
Design a geo-distributed consensus system.
Summary
- Consensus is about getting distributed nodes to agree
- Paxos is the theoretical foundation; Raft is more practical
- Leader election simplifies many distributed protocols
- Coordination services (ZooKeeper, etcd) provide primitives for building distributed systems