Distributed Systems Fundamentals
Introduction
A distributed system is a collection of independent computers that appears to users as a single coherent system. These systems are essential for building scalable, reliable, and available applications. This reading covers core concepts, challenges, and models.
Learning Objectives
By the end of this reading, you will be able to:
- Define distributed systems and their characteristics
- Understand the challenges unique to distributed computing
- Reason about timing and ordering in distributed systems
- Apply the CAP theorem to system design decisions
- Understand different consistency models
1. What is a Distributed System?
Definition
A distributed system is a system where:
- Multiple autonomous computers
- Connected by a network
- Coordinate to achieve a common goal
- Appear as a single system to users
Why Distributed Systems?
- Scalability: Handle more load by adding machines
- Reliability: No single point of failure
- Performance: Process data closer to users
- Cost: Commodity hardware instead of supercomputers
- Geography: Serve global users with low latency
Examples
- Web applications (Google, Facebook, Amazon)
- Cloud storage (S3, Dropbox, Google Drive)
- Distributed databases (Cassandra, MongoDB, CockroachDB)
- Message queues (Kafka, RabbitMQ)
- Blockchain networks
2. Challenges in Distributed Systems
The Eight Fallacies of Distributed Computing
- The network is reliable - Networks fail, packets get lost
- Latency is zero - Communication takes time
- Bandwidth is infinite - Network capacity is limited
- The network is secure - Security must be designed in
- Topology doesn't change - Networks are dynamic
- There is one administrator - Multiple parties manage infrastructure
- Transport cost is zero - Moving data costs time and money
- The network is homogeneous - Various devices and protocols exist
Partial Failures
Unlike local systems that either work or fail completely, distributed systems experience partial failures:
# In a distributed system, calls can:
# 1. Succeed
# 2. Fail (and you know it)
# 3. Timeout (you don't know what happened)
def call_remote_service(request):
try:
response = http_client.post(service_url, request, timeout=5)
return response # Success
except ConnectionError:
# Service is down - we know it failed
return handle_failure()
except Timeout:
# Did the request succeed? We don't know!
# The request might have:
# - Never reached the server
# - Reached server but server crashed before processing
# - Been processed but response was lost
# - Succeeded but network is just slow
return handle_uncertainty()
The Two Generals Problem
Two generals must coordinate an attack:
- They can only communicate via messengers
- Messengers can be captured (lost messages)
- How do they agree on attack time?
Impossibility result: No protocol can guarantee agreement over unreliable communication.
# General A sends: "Attack at dawn"
# General B receives, sends: "Confirmed"
# But General B doesn't know if A received confirmation
# So B sends: "Did you receive my confirmation?"
# This goes on forever...
def attempt_agreement():
"""This can never provide certainty"""
send("Attack at dawn")
while True:
ack = wait_for_ack(timeout=10)
if ack:
send("Received your ack") # But did they get THIS message?
else:
# Did they not send ack, or did it get lost?
send("Attack at dawn") # Retry
Byzantine Failures
Nodes can fail in arbitrary ways (not just crash):
- Send incorrect data
- Behave maliciously
- Act inconsistently
Byzantine fault tolerance (BFT) requires 3f+1 nodes to tolerate f Byzantine failures.
3. Time and Ordering
The Problem with Time
In distributed systems, there's no global clock. Each node has its own clock that can:
- Drift from real time
- Jump forward or backward (NTP adjustments)
- Have different offsets from other nodes
# Even "synchronized" clocks can disagree
# Node A thinks it's 12:00:00.000
# Node B thinks it's 12:00:00.050
# 50ms difference can cause ordering issues!
# Example problem:
# Node A: write x=1 at 12:00:00.000
# Node B: write x=2 at 12:00:00.050
# Later reader: which write is "latest"?
# Depends on which clock you trust!
Lamport Clocks
Logical clocks provide ordering without real time.
class LamportClock:
"""Lamport logical clock"""
def __init__(self):
self.time = 0
def tick(self) -> int:
"""Increment clock for local event"""
self.time += 1
return self.time
def send(self) -> int:
"""Get timestamp for sending message"""
self.time += 1
return self.time
def receive(self, msg_time: int):
"""Update clock on receiving message"""
self.time = max(self.time, msg_time) + 1
# Usage
clock_a = LamportClock()
clock_b = LamportClock()
# Node A does local work
clock_a.tick() # time = 1
# Node A sends to B
send_time = clock_a.send() # time = 2
# ... message in transit ...
clock_b.receive(send_time) # B's time = max(0, 2) + 1 = 3
# Node B does work
clock_b.tick() # time = 4
Property: If event A happened before event B, then Lamport(A) < Lamport(B). Limitation: Converse is not true - Lamport(A) < Lamport(B) doesn't mean A happened before B.
Vector Clocks
Vector clocks capture the complete causal history.
from typing import Dict
class VectorClock:
"""Vector clock for detecting causality"""
def __init__(self, node_id: str, nodes: list):
self.node_id = node_id
self.clock: Dict[str, int] = {n: 0 for n in nodes}
def tick(self):
"""Increment own component"""
self.clock[self.node_id] += 1
def send(self) -> Dict[str, int]:
"""Get vector for sending"""
self.tick()
return self.clock.copy()
def receive(self, other_clock: Dict[str, int]):
"""Merge received vector clock"""
for node, time in other_clock.items():
self.clock[node] = max(self.clock.get(node, 0), time)
self.tick()
def happened_before(self, other: 'VectorClock') -> bool:
"""Check if self happened before other"""
# A < B iff all components A[i] <= B[i] and at least one A[i] < B[i]
all_leq = all(self.clock[n] <= other.clock[n] for n in self.clock)
some_lt = any(self.clock[n] < other.clock[n] for n in self.clock)
return all_leq and some_lt
def concurrent_with(self, other: 'VectorClock') -> bool:
"""Check if self and other are concurrent (incomparable)"""
return not self.happened_before(other) and not other.happened_before(self)
4. The CAP Theorem
Statement
In a distributed system, you can have at most 2 of 3 properties:
- Consistency: Every read receives the most recent write
- Availability: Every request receives a response
- Partition Tolerance: System works despite network partitions
Since network partitions are inevitable, you must choose between:
- CP: Consistent and Partition-tolerant (sacrifice availability)
- AP: Available and Partition-tolerant (sacrifice consistency)
Example
# Scenario: Two nodes, A and B, with network partition
# User writes x=1 to A, then reads from B
# CP System (e.g., traditional RDBMS with 2PC):
# During partition, writes are rejected or blocked
# Read from B returns error or waits
# Consistency maintained, but availability sacrificed
# AP System (e.g., Cassandra with eventual consistency):
# Write to A succeeds
# Read from B returns stale value
# Availability maintained, but consistency sacrificed
class CPSystem:
"""CP: Reject operations during partition"""
def write(self, key, value):
if self.can_reach_quorum():
self.commit_to_all(key, value)
return "OK"
else:
raise PartitionError("Cannot reach quorum")
def read(self, key):
if self.can_reach_quorum():
return self.read_from_majority(key)
else:
raise PartitionError("Cannot reach quorum")
class APSystem:
"""AP: Accept operations during partition"""
def write(self, key, value):
self.local_write(key, value)
self.async_replicate(key, value) # Best effort
return "OK"
def read(self, key):
return self.local_read(key) # Might be stale
CAP in Practice
Real systems make nuanced tradeoffs:
- Cassandra: Tunable consistency (AP by default)
- ZooKeeper: CP (sacrifices availability for consistency)
- DynamoDB: Tunable between AP and CP per operation
- Spanner: CP with high availability (global consensus)
5. Consistency Models
Strong Consistency
All nodes see the same data at the same time.
# Linearizability: Strongest consistency
# Operations appear to happen instantly at some point
# Write x=1 completes at time T1
# Any read starting after T1 must return 1 (or later value)
class Linearizable:
def write(self, key, value):
# Must propagate to all nodes before returning
self.sync_write_to_all(key, value)
def read(self, key):
# Must get latest value
return self.sync_read_from_majority(key)
Eventual Consistency
If no new updates, all replicas will eventually converge.
class EventuallyConsistent:
"""Eventually consistent store"""
def __init__(self):
self.local_data = {}
self.pending_updates = []
def write(self, key, value):
# Write locally immediately
self.local_data[key] = value
# Queue for async replication
self.pending_updates.append((key, value))
return "OK"
def read(self, key):
# Return local value (might be stale)
return self.local_data.get(key)
def sync_background(self):
"""Background sync with other nodes"""
while self.pending_updates:
key, value = self.pending_updates.pop(0)
self.replicate_to_peers(key, value)
Causal Consistency
If operation A causally precedes B, everyone sees A before B.
class CausallyConsistent:
"""Causally consistent store using vector clocks"""
def __init__(self, node_id, nodes):
self.node_id = node_id
self.clock = VectorClock(node_id, nodes)
self.data = {} # key -> (value, vector_clock)
self.pending = [] # Operations waiting for dependencies
def write(self, key, value):
self.clock.tick()
self.data[key] = (value, self.clock.clock.copy())
self.replicate(key, value, self.clock.clock.copy())
def read(self, key):
if key in self.data:
value, _ = self.data[key]
return value
return None
def receive_update(self, key, value, sender_clock):
# Check if we've seen all causal dependencies
if self.can_apply(sender_clock):
self.data[key] = (value, sender_clock)
self.clock.receive(sender_clock)
self.try_apply_pending()
else:
# Wait for dependencies
self.pending.append((key, value, sender_clock))
def can_apply(self, sender_clock):
"""Check if we have all causal dependencies"""
for node, time in sender_clock.items():
if node != self.node_id:
if self.clock.clock.get(node, 0) < time - 1:
return False
return True
Read Your Writes
A client always sees their own writes.
Monotonic Reads
Once a client sees a value, they never see an older value.
Exercises
Basic
Explain why the two generals problem is unsolvable with a finite number of messages.
Implement Lamport clocks and show an example where Lamport(A) < Lamport(B) but A and B are concurrent.
Given a distributed key-value store, explain the tradeoffs of choosing CP vs AP.
Intermediate
Implement vector clocks and show how to detect concurrent operations.
Design a simple eventually consistent shopping cart (inspired by Amazon).
Explain how systems like Cassandra allow tunable consistency.
Advanced
Implement a conflict-free replicated data type (CRDT) for a counter.
Analyze the consistency guarantees of a real distributed database.
Design a system that provides causal consistency across multiple data centers.
Summary
- Distributed systems involve multiple computers coordinating over a network
- Key challenges: partial failures, unreliable networks, no global clock
- Logical clocks (Lamport, Vector) provide ordering without real time
- CAP theorem: choose consistency or availability during partitions
- Multiple consistency models exist, each with different tradeoffs