Advanced Distributed Systems

Vector Clocks, CRDTs, 2PC, Saga - Expert-level patterns

Back to Study Guide

1. Vector Clocks - Tracking Causality

Detect causality and conflicts in distributed systems without synchronized clocks.

Problem: How do we order events across distributed nodes when clocks aren't synchronized?
Solution: Vector clocks track causal relationships: happened-before, concurrent, or after.
sequenceDiagram participant A as Node A participant B as Node B participant C as Node C Note over A: A:1, B:0, C:0 A->>B: Write x=1 [A:1, B:0, C:0] Note over B: A:1, B:1, C:0 B->>C: Write y=2 [A:1, B:1, C:0] Note over C: A:1, B:1, C:1 Note over A: A:2, B:0, C:0 (concurrent!) A->>C: Write x=3 [A:2, B:0, C:0] Note over C: CONFLICT DETECTED!
x=1 [A:1,B:1,C:1] vs x=3 [A:2,B:0,C:0]
from typing import Dict, Tuple
from copy import deepcopy

class VectorClock:
    """
    Vector Clock for tracking causality in distributed systems.

    Each node maintains a vector of logical clocks (one per node).

    Rules:
    1. Internal event: increment own clock
    2. Send message: increment own clock, send entire vector
    3. Receive message: merge vectors (take max), increment own clock

    Used by: Dynamo, Riak, Cassandra for conflict detection
    """

    def __init__(self, node_id: str, nodes: list):
        """
        node_id: This node's ID
        nodes: List of all node IDs in system
        """
        self.node_id = node_id
        self.clock: Dict[str, int] = {node: 0 for node in nodes}

    def increment(self):
        """Increment own clock (internal event)"""
        self.clock[self.node_id] += 1

    def update(self, other_clock: Dict[str, int]):
        """
        Update clock on message receive.

        Takes component-wise maximum, then increments own clock.
        """
        for node in self.clock:
            self.clock[node] = max(self.clock[node], other_clock.get(node, 0))

        self.increment()

    def happens_before(self, other: 'VectorClock') -> bool:
        """
        Check if self happened-before other.

        self < other iff:
        - self[i] <= other[i] for all i
        - self[i] < other[i] for at least one i
        """
        less_or_equal = all(self.clock[node] <= other.clock[node] for node in self.clock)
        strictly_less = any(self.clock[node] < other.clock[node] for node in self.clock)

        return less_or_equal and strictly_less

    def concurrent_with(self, other: 'VectorClock') -> bool:
        """
        Check if self and other are concurrent (conflict).

        Concurrent iff neither happened-before the other.
        """
        return not self.happens_before(other) and not other.happens_before(self)

    def __repr__(self):
        return str(self.clock)

# Example: Detecting conflicts with vector clocks
print("=== Vector Clock Example ===\n")

nodes = ["A", "B", "C"]

# Node A
vc_a = VectorClock("A", nodes)
vc_a.increment()  # A:1, B:0, C:0
print(f"Node A writes x=1: {vc_a}")

# Node A sends to B
vc_b = VectorClock("B", nodes)
vc_b.update(vc_a.clock)  # A:1, B:1, C:0
print(f"Node B receives, writes y=2: {vc_b}")

# Node B sends to C
vc_c = VectorClock("C", nodes)
vc_c.update(vc_b.clock)  # A:1, B:1, C:1
print(f"Node C receives, writes z=3: {vc_c}")

# Meanwhile, Node A writes again (concurrent with B and C!)
vc_a.increment()  # A:2, B:0, C:0
print(f"\nNode A writes x=5 (concurrent!): {vc_a}")

# Node C receives A's write - CONFLICT!
print(f"\nConflict detection:")
print(f"  C's version {vc_c} concurrent with A's version {vc_a}? {vc_c.concurrent_with(vc_a)}")
print(f"  Must resolve conflict! (e.g., last-write-wins, merge, ask user)")

2. CRDTs - Conflict-Free Replicated Data Types

Data structures that automatically merge conflicts without coordination.

from typing import Dict, Set

class GCounter:
    """
    Grow-Only Counter (G-Counter) CRDT.

    Properties:
    - Supports increment only (no decrement)
    - Eventually consistent
    - Conflict-free merge

    Used by: Distributed counters (view counts, likes)
    """

    def __init__(self, node_id: str, nodes: list):
        self.node_id = node_id
        self.counts: Dict[str, int] = {node: 0 for node in nodes}

    def increment(self, amount: int = 1):
        """Increment counter on this node"""
        self.counts[self.node_id] += amount

    def value(self) -> int:
        """Get total count (sum across all nodes)"""
        return sum(self.counts.values())

    def merge(self, other: 'GCounter'):
        """
        Merge with another replica.

        Takes max for each node (idempotent, commutative, associative).
        """
        for node in self.counts:
            self.counts[node] = max(self.counts[node], other.counts.get(node, 0))

class PNCounter:
    """
    Positive-Negative Counter (PN-Counter) CRDT.

    Properties:
    - Supports both increment and decrement
    - Two G-Counters: increments and decrements
    - Value = sum(increments) - sum(decrements)

    Used by: Distributed counters with decrements (inventory, quotas)
    """

    def __init__(self, node_id: str, nodes: list):
        self.node_id = node_id
        self.increments = GCounter(node_id, nodes)
        self.decrements = GCounter(node_id, nodes)

    def increment(self, amount: int = 1):
        self.increments.increment(amount)

    def decrement(self, amount: int = 1):
        self.decrements.increment(amount)

    def value(self) -> int:
        return self.increments.value() - self.decrements.value()

    def merge(self, other: 'PNCounter'):
        self.increments.merge(other.increments)
        self.decrements.merge(other.decrements)

class LWWRegister:
    """
    Last-Write-Wins Register (LWW-Register) CRDT.

    Properties:
    - Stores single value with timestamp
    - Merge: keep value with latest timestamp
    - Requires synchronized clocks (or logical timestamps)

    Used by: Riak, Cassandra for conflict resolution
    """

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.value = None
        self.timestamp = 0

    def set(self, value: any, timestamp: float):
        """Set value with timestamp"""
        if timestamp > self.timestamp:
            self.value = value
            self.timestamp = timestamp

    def merge(self, other: 'LWWRegister'):
        """Merge: keep latest write"""
        if other.timestamp > self.timestamp:
            self.value = other.value
            self.timestamp = other.timestamp

class ORSet:
    """
    Observed-Remove Set (OR-Set) CRDT.

    Properties:
    - Add-wins semantics (if concurrent add/remove, add wins)
    - Each element tagged with unique ID
    - Remove deletes specific element+ID pair

    Used by: Riak, Redis Enterprise for distributed sets
    """

    def __init__(self):
        # element -> set of unique tags
        self.elements: Dict[any, Set[str]] = {}

    def add(self, element: any, unique_tag: str):
        """Add element with unique tag"""
        if element not in self.elements:
            self.elements[element] = set()
        self.elements[element].add(unique_tag)

    def remove(self, element: any):
        """Remove all tags for element"""
        if element in self.elements:
            del self.elements[element]

    def contains(self, element: any) -> bool:
        return element in self.elements and len(self.elements[element]) > 0

    def merge(self, other: 'ORSet'):
        """Merge: union of elements with their tags"""
        for element, tags in other.elements.items():
            if element not in self.elements:
                self.elements[element] = set()
            self.elements[element].update(tags)

# Example: CRDTs in action
print("\n=== CRDT Example ===\n")

# PN-Counter: Distributed like counter
nodes = ["replica1", "replica2", "replica3"]

counter1 = PNCounter("replica1", nodes)
counter2 = PNCounter("replica2", nodes)

# Concurrent updates
counter1.increment(5)
counter2.increment(3)
counter2.decrement(1)

print(f"Counter 1: {counter1.value()}")
print(f"Counter 2: {counter2.value()}")

# Merge (conflict-free!)
counter1.merge(counter2)
counter2.merge(counter1)

print(f"After merge: {counter1.value()} (both replicas agree)")

# OR-Set: Shopping cart across devices
print("\n--- OR-Set Shopping Cart ---")
cart1 = ORSet()  # Mobile app
cart2 = ORSet()  # Web browser

# Add items
cart1.add("laptop", "mobile-123")
cart2.add("mouse", "web-456")

# Remove on mobile (but web hasn't synced yet)
cart1.remove("laptop")

# Merge
cart1.merge(cart2)
cart2.merge(cart1)

print(f"Cart contains 'laptop': {cart1.contains('laptop')}")
print(f"Cart contains 'mouse': {cart1.contains('mouse')}")

3. Two-Phase Commit (2PC)

Atomic commit protocol for distributed transactions.

sequenceDiagram participant C as Coordinator participant P1 as Participant 1 participant P2 as Participant 2 Note over C,P2: Phase 1: PREPARE C->>P1: PREPARE C->>P2: PREPARE P1->>P1: Lock resources P1->>C: VOTE YES P2->>P2: Lock resources P2->>C: VOTE YES Note over C,P2: Phase 2: COMMIT C->>P1: COMMIT C->>P2: COMMIT P1->>P1: Apply transaction P1->>C: ACK P2->>P2: Apply transaction P2->>C: ACK
from enum import Enum
from typing import List, Dict

class TxnState(Enum):
    PREPARING = "preparing"
    COMMITTED = "committed"
    ABORTED = "aborted"

class Participant:
    """2PC Participant (database node)"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.prepared_txns: Dict[str, bool] = {}

    def prepare(self, txn_id: str) -> bool:
        """
        Phase 1: Prepare to commit.

        Returns: True if can commit, False otherwise
        """
        # Simulate checking if transaction can be committed
        can_commit = True  # In real system: check locks, constraints, etc.

        if can_commit:
            self.prepared_txns[txn_id] = True
            print(f"  [{self.node_id}] PREPARE {txn_id}: VOTE YES")
            return True
        else:
            print(f"  [{self.node_id}] PREPARE {txn_id}: VOTE NO")
            return False

    def commit(self, txn_id: str):
        """Phase 2: Commit transaction"""
        if txn_id in self.prepared_txns:
            print(f"  [{self.node_id}] COMMIT {txn_id}")
            del self.prepared_txns[txn_id]

    def abort(self, txn_id: str):
        """Phase 2: Abort transaction"""
        if txn_id in self.prepared_txns:
            print(f"  [{self.node_id}] ABORT {txn_id}")
            del self.prepared_txns[txn_id]

class TwoPhaseCommitCoordinator:
    """2PC Coordinator"""

    def __init__(self, participants: List[Participant]):
        self.participants = participants

    def execute_transaction(self, txn_id: str) -> bool:
        """
        Execute distributed transaction with 2PC.

        Returns: True if committed, False if aborted
        """
        print(f"\n=== 2PC Transaction {txn_id} ===")

        # Phase 1: PREPARE
        print("Phase 1: PREPARE")
        votes = []
        for p in self.participants:
            vote = p.prepare(txn_id)
            votes.append(vote)

        # Decision: commit only if ALL voted YES
        all_yes = all(votes)

        # Phase 2: COMMIT or ABORT
        if all_yes:
            print("\nPhase 2: COMMIT (all voted YES)")
            for p in self.participants:
                p.commit(txn_id)
            return True
        else:
            print("\nPhase 2: ABORT (at least one voted NO)")
            for p in self.participants:
                p.abort(txn_id)
            return False

# Example: 2PC
print("=== Two-Phase Commit Example ===")

participants = [
    Participant("db1"),
    Participant("db2"),
    Participant("db3")
]

coordinator = TwoPhaseCommitCoordinator(participants)

# Successful transaction
success = coordinator.execute_transaction("txn-001")
print(f"\nResult: {'COMMITTED' if success else 'ABORTED'}")
2PC Problems:
Blocking: If coordinator crashes after PREPARE, participants stuck waiting
Single point of failure: Coordinator failure blocks all
Performance: 2 network round-trips + locks held during both phases

Modern Alternative: Saga pattern (compensating transactions)

4. Saga Pattern - Long-Running Transactions

Coordinate distributed transactions with compensating actions.

from typing import Callable, List
from dataclasses import dataclass

@dataclass
class SagaStep:
    """Single step in saga"""
    name: str
    action: Callable  # Forward action
    compensate: Callable  # Rollback action

class Saga:
    """
    Saga Pattern for distributed transactions.

    vs 2PC:
    - No locks (better availability)
    - Eventual consistency (not immediate)
    - Uses compensating transactions for rollback

    Example: E-commerce order
    1. Reserve inventory
    2. Charge payment
    3. Create shipment

    If step 3 fails:
    - Compensate step 2 (refund payment)
    - Compensate step 1 (release inventory)
    """

    def __init__(self):
        self.steps: List[SagaStep] = []
        self.completed_steps: List[SagaStep] = []

    def add_step(self, name: str, action: Callable, compensate: Callable):
        """Add step to saga"""
        self.steps.append(SagaStep(name, action, compensate))

    def execute(self) -> bool:
        """
        Execute saga.

        Returns: True if all steps succeeded, False if rolled back
        """
        print("=== Executing Saga ===")

        for step in self.steps:
            try:
                print(f"\n[STEP] {step.name}")
                step.action()
                self.completed_steps.append(step)

            except Exception as e:
                print(f"\n[FAILED] {step.name}: {e}")
                print("\n=== Rolling Back ===")
                self._compensate()
                return False

        print("\n=== Saga Completed Successfully ===")
        return True

    def _compensate(self):
        """Rollback completed steps in reverse order"""
        for step in reversed(self.completed_steps):
            print(f"[COMPENSATE] {step.name}")
            try:
                step.compensate()
            except Exception as e:
                print(f"  ERROR during compensation: {e}")

# Example: E-commerce order saga
print("\n=== Saga Pattern Example ===\n")

# State
inventory_reserved = False
payment_charged = False
shipment_created = False

# Step 1: Reserve inventory
def reserve_inventory():
    global inventory_reserved
    print("  Reserving inventory...")
    inventory_reserved = True

def release_inventory():
    global inventory_reserved
    print("  Releasing inventory...")
    inventory_reserved = False

# Step 2: Charge payment
def charge_payment():
    global payment_charged
    print("  Charging payment...")
    payment_charged = True

def refund_payment():
    global payment_charged
    print("  Refunding payment...")
    payment_charged = False

# Step 3: Create shipment (simulate failure)
def create_shipment():
    global shipment_created
    print("  Creating shipment...")
    raise Exception("Shipment service unavailable")  # Simulated failure

def cancel_shipment():
    global shipment_created
    print("  Canceling shipment...")
    shipment_created = False

# Execute saga
saga = Saga()
saga.add_step("Reserve Inventory", reserve_inventory, release_inventory)
saga.add_step("Charge Payment", charge_payment, refund_payment)
saga.add_step("Create Shipment", create_shipment, cancel_shipment)

success = saga.execute()

print(f"\n--- Final State ---")
print(f"Inventory reserved: {inventory_reserved}")
print(f"Payment charged: {payment_charged}")
print(f"Shipment created: {shipment_created}")
print(f"\nAll compensated: {not any([inventory_reserved, payment_charged, shipment_created])}")
When to Use Each Pattern:

2PC: Strong consistency required, short transactions, low latency network
Saga: Long-running workflows, microservices, eventual consistency OK
CRDTs: High availability required, conflict resolution automated
Vector Clocks: Need to detect conflicts, manual resolution

Key Takeaways

Pattern Consistency Availability Use Case
Vector Clocks Detect conflicts High Dynamo, Riak
CRDTs Eventual Highest Collaborative editing, counters
2PC Strong Low (blocking) Traditional databases
Saga Eventual High Microservices, long workflows
Back to Study Guide