Distributed Systems Fundamentals

A distributed system is a collection of independent computers that appears to users as a single coherent system. These systems are essential for building scalable, reliable, and available applications. ## 1. What is a Distributed System?

Definition

A distributed system is a system where:

  • Multiple autonomous computers
  • Connected by a network
  • Coordinate to achieve a common goal
  • Appear as a single system to users

Why Distributed Systems?

  1. Scalability: Handle more load by adding machines
  2. Reliability: No single point of failure
  3. Performance: Process data closer to users
  4. Cost: Commodity hardware instead of supercomputers
  5. Geography: Serve global users with low latency

Examples

  • Web applications (Google, Facebook, Amazon)
  • Cloud storage (S3, Dropbox, Google Drive)
  • Distributed databases (Cassandra, MongoDB, CockroachDB)
  • Message queues (Kafka, RabbitMQ)
  • Blockchain networks

2. Challenges in Distributed Systems

The Eight Fallacies of Distributed Computing

  1. The network is reliable - Networks fail, packets get lost
  2. Latency is zero - Communication takes time
  3. Bandwidth is infinite - Network capacity is limited
  4. The network is secure - Security must be designed in
  5. Topology doesn't change - Networks are dynamic
  6. There is one administrator - Multiple parties manage infrastructure
  7. Transport cost is zero - Moving data costs time and money
  8. The network is homogeneous - Various devices and protocols exist

Partial Failures

Unlike local systems that either work or fail completely, distributed systems experience partial failures:

# In a distributed system, calls can:
# 1. Succeed
# 2. Fail (and you know it)
# 3. Timeout (you don't know what happened)

def call_remote_service(request):
    try:
        response = http_client.post(service_url, request, timeout=5)
        return response  # Success
    except ConnectionError:
        # Service is down - we know it failed
        return handle_failure()
    except Timeout:
        # Did the request succeed? We don't know!
        # The request might have:
        # - Never reached the server
        # - Reached server but server crashed before processing
        # - Been processed but response was lost
        # - Succeeded but network is just slow
        return handle_uncertainty()

The Two Generals Problem

Two generals must coordinate an attack:

  • They can only communicate via messengers
  • Messengers can be captured (lost messages)
  • How do they agree on attack time?

Impossibility result: No protocol can guarantee agreement over unreliable communication.

# General A sends: "Attack at dawn"
# General B receives, sends: "Confirmed"
# But General B doesn't know if A received confirmation
# So B sends: "Did you receive my confirmation?"
# This goes on forever...

def attempt_agreement():
    """This can never provide certainty"""
    send("Attack at dawn")

    while True:
        ack = wait_for_ack(timeout=10)
        if ack:
            send("Received your ack")  # But did they get THIS message?
        else:
            # Did they not send ack, or did it get lost?
            send("Attack at dawn")  # Retry

Byzantine Failures

Nodes can fail in arbitrary ways (not just crash):

  • Send incorrect data
  • Behave maliciously
  • Act inconsistently

Byzantine fault tolerance (BFT) requires 3f+1 nodes to tolerate f Byzantine failures.


3. Time and Ordering

The Problem with Time

In distributed systems, there's no global clock. Each node has its own clock that can:

  • Drift from real time
  • Jump forward or backward (NTP adjustments)
  • Have different offsets from other nodes
# Even "synchronized" clocks can disagree
# Node A thinks it's 12:00:00.000
# Node B thinks it's 12:00:00.050
# 50ms difference can cause ordering issues!

# Example problem:
# Node A: write x=1 at 12:00:00.000
# Node B: write x=2 at 12:00:00.050
# Later reader: which write is "latest"?
# Depends on which clock you trust!

Lamport Clocks

Logical clocks provide ordering without real time.

class LamportClock:
    """Lamport logical clock"""

    def __init__(self):
        self.time = 0

    def tick(self) -> int:
        """Increment clock for local event"""
        self.time += 1
        return self.time

    def send(self) -> int:
        """Get timestamp for sending message"""
        self.time += 1
        return self.time

    def receive(self, msg_time: int):
        """Update clock on receiving message"""
        self.time = max(self.time, msg_time) + 1

# Usage
clock_a = LamportClock()
clock_b = LamportClock()

# Node A does local work
clock_a.tick()  # time = 1

# Node A sends to B
send_time = clock_a.send()  # time = 2
# ... message in transit ...
clock_b.receive(send_time)  # B's time = max(0, 2) + 1 = 3

# Node B does work
clock_b.tick()  # time = 4

Property: If event A happened before event B, then Lamport(A) < Lamport(B). Limitation: Converse is not true - Lamport(A) < Lamport(B) doesn't mean A happened before B.

Vector Clocks

Vector clocks capture the complete causal history.

from typing import Dict

class VectorClock:
    """Vector clock for detecting causality"""

    def __init__(self, node_id: str, nodes: list):
        self.node_id = node_id
        self.clock: Dict[str, int] = {n: 0 for n in nodes}

    def tick(self):
        """Increment own component"""
        self.clock[self.node_id] += 1

    def send(self) -> Dict[str, int]:
        """Get vector for sending"""
        self.tick()
        return self.clock.copy()

    def receive(self, other_clock: Dict[str, int]):
        """Merge received vector clock"""
        for node, time in other_clock.items():
            self.clock[node] = max(self.clock.get(node, 0), time)
        self.tick()

    def happened_before(self, other: 'VectorClock') -> bool:
        """Check if self happened before other"""
        # A < B iff all components A[i] <= B[i] and at least one A[i] < B[i]
        all_leq = all(self.clock[n] <= other.clock[n] for n in self.clock)
        some_lt = any(self.clock[n] < other.clock[n] for n in self.clock)
        return all_leq and some_lt

    def concurrent_with(self, other: 'VectorClock') -> bool:
        """Check if self and other are concurrent (incomparable)"""
        return not self.happened_before(other) and not other.happened_before(self)

4. The CAP Theorem

Statement

In a distributed system, you can have at most 2 of 3 properties:

  1. Consistency: Every read receives the most recent write
  2. Availability: Every request receives a response
  3. Partition Tolerance: System works despite network partitions

Since network partitions are inevitable, you must choose between:

  • CP: Consistent and Partition-tolerant (sacrifice availability)
  • AP: Available and Partition-tolerant (sacrifice consistency)

Example

# Scenario: Two nodes, A and B, with network partition
# User writes x=1 to A, then reads from B

# CP System (e.g., traditional RDBMS with 2PC):
# During partition, writes are rejected or blocked
# Read from B returns error or waits
# Consistency maintained, but availability sacrificed

# AP System (e.g., Cassandra with eventual consistency):
# Write to A succeeds
# Read from B returns stale value
# Availability maintained, but consistency sacrificed

class CPSystem:
    """CP: Reject operations during partition"""

    def write(self, key, value):
        if self.can_reach_quorum():
            self.commit_to_all(key, value)
            return "OK"
        else:
            raise PartitionError("Cannot reach quorum")

    def read(self, key):
        if self.can_reach_quorum():
            return self.read_from_majority(key)
        else:
            raise PartitionError("Cannot reach quorum")

class APSystem:
    """AP: Accept operations during partition"""

    def write(self, key, value):
        self.local_write(key, value)
        self.async_replicate(key, value)  # Best effort
        return "OK"

    def read(self, key):
        return self.local_read(key)  # Might be stale

CAP in Practice

Real systems make nuanced tradeoffs:

  • Cassandra: Tunable consistency (AP by default)
  • ZooKeeper: CP (sacrifices availability for consistency)
  • DynamoDB: Tunable between AP and CP per operation
  • Spanner: CP with high availability (global consensus)

5. Consistency Models

Strong Consistency

All nodes see the same data at the same time.

# Linearizability: Strongest consistency
# Operations appear to happen instantly at some point

# Write x=1 completes at time T1
# Any read starting after T1 must return 1 (or later value)

class Linearizable:
    def write(self, key, value):
        # Must propagate to all nodes before returning
        self.sync_write_to_all(key, value)

    def read(self, key):
        # Must get latest value
        return self.sync_read_from_majority(key)

Eventual Consistency

If no new updates, all replicas will eventually converge.

class EventuallyConsistent:
    """Eventually consistent store"""

    def __init__(self):
        self.local_data = {}
        self.pending_updates = []

    def write(self, key, value):
        # Write locally immediately
        self.local_data[key] = value
        # Queue for async replication
        self.pending_updates.append((key, value))
        return "OK"

    def read(self, key):
        # Return local value (might be stale)
        return self.local_data.get(key)

    def sync_background(self):
        """Background sync with other nodes"""
        while self.pending_updates:
            key, value = self.pending_updates.pop(0)
            self.replicate_to_peers(key, value)

Causal Consistency

If operation A causally precedes B, everyone sees A before B.

class CausallyConsistent:
    """Causally consistent store using vector clocks"""

    def __init__(self, node_id, nodes):
        self.node_id = node_id
        self.clock = VectorClock(node_id, nodes)
        self.data = {}  # key -> (value, vector_clock)
        self.pending = []  # Operations waiting for dependencies

    def write(self, key, value):
        self.clock.tick()
        self.data[key] = (value, self.clock.clock.copy())
        self.replicate(key, value, self.clock.clock.copy())

    def read(self, key):
        if key in self.data:
            value, _ = self.data[key]
            return value
        return None

    def receive_update(self, key, value, sender_clock):
        # Check if we've seen all causal dependencies
        if self.can_apply(sender_clock):
            self.data[key] = (value, sender_clock)
            self.clock.receive(sender_clock)
            self.try_apply_pending()
        else:
            # Wait for dependencies
            self.pending.append((key, value, sender_clock))

    def can_apply(self, sender_clock):
        """Check if we have all causal dependencies"""
        for node, time in sender_clock.items():
            if node != self.node_id:
                if self.clock.clock.get(node, 0) < time - 1:
                    return False
        return True

Read Your Writes

A client always sees their own writes.

Monotonic Reads

Once a client sees a value, they never see an older value.


Exercises

Basic

  1. Explain why the two generals problem is unsolvable with a finite number of messages.

  2. Implement Lamport clocks and show an example where Lamport(A) < Lamport(B) but A and B are concurrent.

  3. Given a distributed key-value store, explain the tradeoffs of choosing CP vs AP.

Intermediate

  1. Implement vector clocks and show how to detect concurrent operations.

  2. Design a simple eventually consistent shopping cart (inspired by Amazon).

  3. Explain how systems like Cassandra allow tunable consistency.

Advanced

  1. Implement a conflict-free replicated data type (CRDT) for a counter.

  2. Analyze the consistency guarantees of a real distributed database.

  3. Design a system that provides causal consistency across multiple data centers.


Summary

  • Distributed systems involve multiple computers coordinating over a network
  • Key challenges: partial failures, unreliable networks, no global clock
  • Logical clocks (Lamport, Vector) provide ordering without real time
  • CAP theorem: choose consistency or availability during partitions
  • Multiple consistency models exist, each with different tradeoffs

Next Reading

Consensus and Coordination →