Distributed Systems Fundamentals
A distributed system is a collection of independent computers that appears to users as a single coherent system. These systems are essential for building scalable, reliable, and available applications. ## 1. What is a Distributed System?
Definition
A distributed system is a system where:
- Multiple autonomous computers
- Connected by a network
- Coordinate to achieve a common goal
- Appear as a single system to users
Why Distributed Systems?
- Scalability: Handle more load by adding machines
- Reliability: No single point of failure
- Performance: Process data closer to users
- Cost: Commodity hardware instead of supercomputers
- Geography: Serve global users with low latency
Examples
- Web applications (Google, Facebook, Amazon)
- Cloud storage (S3, Dropbox, Google Drive)
- Distributed databases (Cassandra, MongoDB, CockroachDB)
- Message queues (Kafka, RabbitMQ)
- Blockchain networks
2. Challenges in Distributed Systems
The Eight Fallacies of Distributed Computing
- The network is reliable - Networks fail, packets get lost
- Latency is zero - Communication takes time
- Bandwidth is infinite - Network capacity is limited
- The network is secure - Security must be designed in
- Topology doesn't change - Networks are dynamic
- There is one administrator - Multiple parties manage infrastructure
- Transport cost is zero - Moving data costs time and money
- The network is homogeneous - Various devices and protocols exist
Partial Failures
Unlike local systems that either work or fail completely, distributed systems experience partial failures:
# In a distributed system, calls can:
# 1. Succeed
# 2. Fail (and you know it)
# 3. Timeout (you don't know what happened)
def call_remote_service(request):
try:
response = http_client.post(service_url, request, timeout=5)
return response # Success
except ConnectionError:
# Service is down - we know it failed
return handle_failure()
except Timeout:
# Did the request succeed? We don't know!
# The request might have:
# - Never reached the server
# - Reached server but server crashed before processing
# - Been processed but response was lost
# - Succeeded but network is just slow
return handle_uncertainty()
The Two Generals Problem
Two generals must coordinate an attack:
- They can only communicate via messengers
- Messengers can be captured (lost messages)
- How do they agree on attack time?
Impossibility result: No protocol can guarantee agreement over unreliable communication.
# General A sends: "Attack at dawn"
# General B receives, sends: "Confirmed"
# But General B doesn't know if A received confirmation
# So B sends: "Did you receive my confirmation?"
# This goes on forever...
def attempt_agreement():
"""This can never provide certainty"""
send("Attack at dawn")
while True:
ack = wait_for_ack(timeout=10)
if ack:
send("Received your ack") # But did they get THIS message?
else:
# Did they not send ack, or did it get lost?
send("Attack at dawn") # Retry
Byzantine Failures
Nodes can fail in arbitrary ways (not just crash):
- Send incorrect data
- Behave maliciously
- Act inconsistently
Byzantine fault tolerance (BFT) requires 3f+1 nodes to tolerate f Byzantine failures.
3. Time and Ordering
The Problem with Time
In distributed systems, there's no global clock. Each node has its own clock that can:
- Drift from real time
- Jump forward or backward (NTP adjustments)
- Have different offsets from other nodes
# Even "synchronized" clocks can disagree
# Node A thinks it's 12:00:00.000
# Node B thinks it's 12:00:00.050
# 50ms difference can cause ordering issues!
# Example problem:
# Node A: write x=1 at 12:00:00.000
# Node B: write x=2 at 12:00:00.050
# Later reader: which write is "latest"?
# Depends on which clock you trust!
Lamport Clocks
Logical clocks provide ordering without real time.
class LamportClock:
"""Lamport logical clock"""
def __init__(self):
self.time = 0
def tick(self) -> int:
"""Increment clock for local event"""
self.time += 1
return self.time
def send(self) -> int:
"""Get timestamp for sending message"""
self.time += 1
return self.time
def receive(self, msg_time: int):
"""Update clock on receiving message"""
self.time = max(self.time, msg_time) + 1
# Usage
clock_a = LamportClock()
clock_b = LamportClock()
# Node A does local work
clock_a.tick() # time = 1
# Node A sends to B
send_time = clock_a.send() # time = 2
# ... message in transit ...
clock_b.receive(send_time) # B's time = max(0, 2) + 1 = 3
# Node B does work
clock_b.tick() # time = 4
Property: If event A happened before event B, then Lamport(A) < Lamport(B). Limitation: Converse is not true - Lamport(A) < Lamport(B) doesn't mean A happened before B.
Vector Clocks
Vector clocks capture the complete causal history.
from typing import Dict
class VectorClock:
"""Vector clock for detecting causality"""
def __init__(self, node_id: str, nodes: list):
self.node_id = node_id
self.clock: Dict[str, int] = {n: 0 for n in nodes}
def tick(self):
"""Increment own component"""
self.clock[self.node_id] += 1
def send(self) -> Dict[str, int]:
"""Get vector for sending"""
self.tick()
return self.clock.copy()
def receive(self, other_clock: Dict[str, int]):
"""Merge received vector clock"""
for node, time in other_clock.items():
self.clock[node] = max(self.clock.get(node, 0), time)
self.tick()
def happened_before(self, other: 'VectorClock') -> bool:
"""Check if self happened before other"""
# A < B iff all components A[i] <= B[i] and at least one A[i] < B[i]
all_leq = all(self.clock[n] <= other.clock[n] for n in self.clock)
some_lt = any(self.clock[n] < other.clock[n] for n in self.clock)
return all_leq and some_lt
def concurrent_with(self, other: 'VectorClock') -> bool:
"""Check if self and other are concurrent (incomparable)"""
return not self.happened_before(other) and not other.happened_before(self)
4. The CAP Theorem
Statement
In a distributed system, you can have at most 2 of 3 properties:
- Consistency: Every read receives the most recent write
- Availability: Every request receives a response
- Partition Tolerance: System works despite network partitions
Since network partitions are inevitable, you must choose between:
- CP: Consistent and Partition-tolerant (sacrifice availability)
- AP: Available and Partition-tolerant (sacrifice consistency)
Example
# Scenario: Two nodes, A and B, with network partition
# User writes x=1 to A, then reads from B
# CP System (e.g., traditional RDBMS with 2PC):
# During partition, writes are rejected or blocked
# Read from B returns error or waits
# Consistency maintained, but availability sacrificed
# AP System (e.g., Cassandra with eventual consistency):
# Write to A succeeds
# Read from B returns stale value
# Availability maintained, but consistency sacrificed
class CPSystem:
"""CP: Reject operations during partition"""
def write(self, key, value):
if self.can_reach_quorum():
self.commit_to_all(key, value)
return "OK"
else:
raise PartitionError("Cannot reach quorum")
def read(self, key):
if self.can_reach_quorum():
return self.read_from_majority(key)
else:
raise PartitionError("Cannot reach quorum")
class APSystem:
"""AP: Accept operations during partition"""
def write(self, key, value):
self.local_write(key, value)
self.async_replicate(key, value) # Best effort
return "OK"
def read(self, key):
return self.local_read(key) # Might be stale
CAP in Practice
Real systems make nuanced tradeoffs:
- Cassandra: Tunable consistency (AP by default)
- ZooKeeper: CP (sacrifices availability for consistency)
- DynamoDB: Tunable between AP and CP per operation
- Spanner: CP with high availability (global consensus)
5. Consistency Models
Strong Consistency
All nodes see the same data at the same time.
# Linearizability: Strongest consistency
# Operations appear to happen instantly at some point
# Write x=1 completes at time T1
# Any read starting after T1 must return 1 (or later value)
class Linearizable:
def write(self, key, value):
# Must propagate to all nodes before returning
self.sync_write_to_all(key, value)
def read(self, key):
# Must get latest value
return self.sync_read_from_majority(key)
Eventual Consistency
If no new updates, all replicas will eventually converge.
class EventuallyConsistent:
"""Eventually consistent store"""
def __init__(self):
self.local_data = {}
self.pending_updates = []
def write(self, key, value):
# Write locally immediately
self.local_data[key] = value
# Queue for async replication
self.pending_updates.append((key, value))
return "OK"
def read(self, key):
# Return local value (might be stale)
return self.local_data.get(key)
def sync_background(self):
"""Background sync with other nodes"""
while self.pending_updates:
key, value = self.pending_updates.pop(0)
self.replicate_to_peers(key, value)
Causal Consistency
If operation A causally precedes B, everyone sees A before B.
class CausallyConsistent:
"""Causally consistent store using vector clocks"""
def __init__(self, node_id, nodes):
self.node_id = node_id
self.clock = VectorClock(node_id, nodes)
self.data = {} # key -> (value, vector_clock)
self.pending = [] # Operations waiting for dependencies
def write(self, key, value):
self.clock.tick()
self.data[key] = (value, self.clock.clock.copy())
self.replicate(key, value, self.clock.clock.copy())
def read(self, key):
if key in self.data:
value, _ = self.data[key]
return value
return None
def receive_update(self, key, value, sender_clock):
# Check if we've seen all causal dependencies
if self.can_apply(sender_clock):
self.data[key] = (value, sender_clock)
self.clock.receive(sender_clock)
self.try_apply_pending()
else:
# Wait for dependencies
self.pending.append((key, value, sender_clock))
def can_apply(self, sender_clock):
"""Check if we have all causal dependencies"""
for node, time in sender_clock.items():
if node != self.node_id:
if self.clock.clock.get(node, 0) < time - 1:
return False
return True
Read Your Writes
A client always sees their own writes.
Monotonic Reads
Once a client sees a value, they never see an older value.
Exercises
Basic
Explain why the two generals problem is unsolvable with a finite number of messages.
Implement Lamport clocks and show an example where Lamport(A) < Lamport(B) but A and B are concurrent.
Given a distributed key-value store, explain the tradeoffs of choosing CP vs AP.
Intermediate
Implement vector clocks and show how to detect concurrent operations.
Design a simple eventually consistent shopping cart (inspired by Amazon).
Explain how systems like Cassandra allow tunable consistency.
Advanced
Implement a conflict-free replicated data type (CRDT) for a counter.
Analyze the consistency guarantees of a real distributed database.
Design a system that provides causal consistency across multiple data centers.
Summary
- Distributed systems involve multiple computers coordinating over a network
- Key challenges: partial failures, unreliable networks, no global clock
- Logical clocks (Lamport, Vector) provide ordering without real time
- CAP theorem: choose consistency or availability during partitions
- Multiple consistency models exist, each with different tradeoffs