1. Vector Clocks - Tracking Causality
Detect causality and conflicts in distributed systems without synchronized clocks.
Problem: How do we order events across distributed nodes when clocks aren't synchronized?
Solution: Vector clocks track causal relationships: happened-before, concurrent, or after.
Solution: Vector clocks track causal relationships: happened-before, concurrent, or after.
sequenceDiagram
participant A as Node A
participant B as Node B
participant C as Node C
Note over A: A:1, B:0, C:0
A->>B: Write x=1 [A:1, B:0, C:0]
Note over B: A:1, B:1, C:0
B->>C: Write y=2 [A:1, B:1, C:0]
Note over C: A:1, B:1, C:1
Note over A: A:2, B:0, C:0 (concurrent!)
A->>C: Write x=3 [A:2, B:0, C:0]
Note over C: CONFLICT DETECTED!
x=1 [A:1,B:1,C:1] vs x=3 [A:2,B:0,C:0]
x=1 [A:1,B:1,C:1] vs x=3 [A:2,B:0,C:0]
from typing import Dict, Tuple
from copy import deepcopy
class VectorClock:
"""
Vector Clock for tracking causality in distributed systems.
Each node maintains a vector of logical clocks (one per node).
Rules:
1. Internal event: increment own clock
2. Send message: increment own clock, send entire vector
3. Receive message: merge vectors (take max), increment own clock
Used by: Dynamo, Riak, Cassandra for conflict detection
"""
def __init__(self, node_id: str, nodes: list):
"""
node_id: This node's ID
nodes: List of all node IDs in system
"""
self.node_id = node_id
self.clock: Dict[str, int] = {node: 0 for node in nodes}
def increment(self):
"""Increment own clock (internal event)"""
self.clock[self.node_id] += 1
def update(self, other_clock: Dict[str, int]):
"""
Update clock on message receive.
Takes component-wise maximum, then increments own clock.
"""
for node in self.clock:
self.clock[node] = max(self.clock[node], other_clock.get(node, 0))
self.increment()
def happens_before(self, other: 'VectorClock') -> bool:
"""
Check if self happened-before other.
self < other iff:
- self[i] <= other[i] for all i
- self[i] < other[i] for at least one i
"""
less_or_equal = all(self.clock[node] <= other.clock[node] for node in self.clock)
strictly_less = any(self.clock[node] < other.clock[node] for node in self.clock)
return less_or_equal and strictly_less
def concurrent_with(self, other: 'VectorClock') -> bool:
"""
Check if self and other are concurrent (conflict).
Concurrent iff neither happened-before the other.
"""
return not self.happens_before(other) and not other.happens_before(self)
def __repr__(self):
return str(self.clock)
# Example: Detecting conflicts with vector clocks
print("=== Vector Clock Example ===\n")
nodes = ["A", "B", "C"]
# Node A
vc_a = VectorClock("A", nodes)
vc_a.increment() # A:1, B:0, C:0
print(f"Node A writes x=1: {vc_a}")
# Node A sends to B
vc_b = VectorClock("B", nodes)
vc_b.update(vc_a.clock) # A:1, B:1, C:0
print(f"Node B receives, writes y=2: {vc_b}")
# Node B sends to C
vc_c = VectorClock("C", nodes)
vc_c.update(vc_b.clock) # A:1, B:1, C:1
print(f"Node C receives, writes z=3: {vc_c}")
# Meanwhile, Node A writes again (concurrent with B and C!)
vc_a.increment() # A:2, B:0, C:0
print(f"\nNode A writes x=5 (concurrent!): {vc_a}")
# Node C receives A's write - CONFLICT!
print(f"\nConflict detection:")
print(f" C's version {vc_c} concurrent with A's version {vc_a}? {vc_c.concurrent_with(vc_a)}")
print(f" Must resolve conflict! (e.g., last-write-wins, merge, ask user)")
2. CRDTs - Conflict-Free Replicated Data Types
Data structures that automatically merge conflicts without coordination.
from typing import Dict, Set
class GCounter:
"""
Grow-Only Counter (G-Counter) CRDT.
Properties:
- Supports increment only (no decrement)
- Eventually consistent
- Conflict-free merge
Used by: Distributed counters (view counts, likes)
"""
def __init__(self, node_id: str, nodes: list):
self.node_id = node_id
self.counts: Dict[str, int] = {node: 0 for node in nodes}
def increment(self, amount: int = 1):
"""Increment counter on this node"""
self.counts[self.node_id] += amount
def value(self) -> int:
"""Get total count (sum across all nodes)"""
return sum(self.counts.values())
def merge(self, other: 'GCounter'):
"""
Merge with another replica.
Takes max for each node (idempotent, commutative, associative).
"""
for node in self.counts:
self.counts[node] = max(self.counts[node], other.counts.get(node, 0))
class PNCounter:
"""
Positive-Negative Counter (PN-Counter) CRDT.
Properties:
- Supports both increment and decrement
- Two G-Counters: increments and decrements
- Value = sum(increments) - sum(decrements)
Used by: Distributed counters with decrements (inventory, quotas)
"""
def __init__(self, node_id: str, nodes: list):
self.node_id = node_id
self.increments = GCounter(node_id, nodes)
self.decrements = GCounter(node_id, nodes)
def increment(self, amount: int = 1):
self.increments.increment(amount)
def decrement(self, amount: int = 1):
self.decrements.increment(amount)
def value(self) -> int:
return self.increments.value() - self.decrements.value()
def merge(self, other: 'PNCounter'):
self.increments.merge(other.increments)
self.decrements.merge(other.decrements)
class LWWRegister:
"""
Last-Write-Wins Register (LWW-Register) CRDT.
Properties:
- Stores single value with timestamp
- Merge: keep value with latest timestamp
- Requires synchronized clocks (or logical timestamps)
Used by: Riak, Cassandra for conflict resolution
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.value = None
self.timestamp = 0
def set(self, value: any, timestamp: float):
"""Set value with timestamp"""
if timestamp > self.timestamp:
self.value = value
self.timestamp = timestamp
def merge(self, other: 'LWWRegister'):
"""Merge: keep latest write"""
if other.timestamp > self.timestamp:
self.value = other.value
self.timestamp = other.timestamp
class ORSet:
"""
Observed-Remove Set (OR-Set) CRDT.
Properties:
- Add-wins semantics (if concurrent add/remove, add wins)
- Each element tagged with unique ID
- Remove deletes specific element+ID pair
Used by: Riak, Redis Enterprise for distributed sets
"""
def __init__(self):
# element -> set of unique tags
self.elements: Dict[any, Set[str]] = {}
def add(self, element: any, unique_tag: str):
"""Add element with unique tag"""
if element not in self.elements:
self.elements[element] = set()
self.elements[element].add(unique_tag)
def remove(self, element: any):
"""Remove all tags for element"""
if element in self.elements:
del self.elements[element]
def contains(self, element: any) -> bool:
return element in self.elements and len(self.elements[element]) > 0
def merge(self, other: 'ORSet'):
"""Merge: union of elements with their tags"""
for element, tags in other.elements.items():
if element not in self.elements:
self.elements[element] = set()
self.elements[element].update(tags)
# Example: CRDTs in action
print("\n=== CRDT Example ===\n")
# PN-Counter: Distributed like counter
nodes = ["replica1", "replica2", "replica3"]
counter1 = PNCounter("replica1", nodes)
counter2 = PNCounter("replica2", nodes)
# Concurrent updates
counter1.increment(5)
counter2.increment(3)
counter2.decrement(1)
print(f"Counter 1: {counter1.value()}")
print(f"Counter 2: {counter2.value()}")
# Merge (conflict-free!)
counter1.merge(counter2)
counter2.merge(counter1)
print(f"After merge: {counter1.value()} (both replicas agree)")
# OR-Set: Shopping cart across devices
print("\n--- OR-Set Shopping Cart ---")
cart1 = ORSet() # Mobile app
cart2 = ORSet() # Web browser
# Add items
cart1.add("laptop", "mobile-123")
cart2.add("mouse", "web-456")
# Remove on mobile (but web hasn't synced yet)
cart1.remove("laptop")
# Merge
cart1.merge(cart2)
cart2.merge(cart1)
print(f"Cart contains 'laptop': {cart1.contains('laptop')}")
print(f"Cart contains 'mouse': {cart1.contains('mouse')}")
3. Two-Phase Commit (2PC)
Atomic commit protocol for distributed transactions.
sequenceDiagram
participant C as Coordinator
participant P1 as Participant 1
participant P2 as Participant 2
Note over C,P2: Phase 1: PREPARE
C->>P1: PREPARE
C->>P2: PREPARE
P1->>P1: Lock resources
P1->>C: VOTE YES
P2->>P2: Lock resources
P2->>C: VOTE YES
Note over C,P2: Phase 2: COMMIT
C->>P1: COMMIT
C->>P2: COMMIT
P1->>P1: Apply transaction
P1->>C: ACK
P2->>P2: Apply transaction
P2->>C: ACK
from enum import Enum
from typing import List, Dict
class TxnState(Enum):
PREPARING = "preparing"
COMMITTED = "committed"
ABORTED = "aborted"
class Participant:
"""2PC Participant (database node)"""
def __init__(self, node_id: str):
self.node_id = node_id
self.prepared_txns: Dict[str, bool] = {}
def prepare(self, txn_id: str) -> bool:
"""
Phase 1: Prepare to commit.
Returns: True if can commit, False otherwise
"""
# Simulate checking if transaction can be committed
can_commit = True # In real system: check locks, constraints, etc.
if can_commit:
self.prepared_txns[txn_id] = True
print(f" [{self.node_id}] PREPARE {txn_id}: VOTE YES")
return True
else:
print(f" [{self.node_id}] PREPARE {txn_id}: VOTE NO")
return False
def commit(self, txn_id: str):
"""Phase 2: Commit transaction"""
if txn_id in self.prepared_txns:
print(f" [{self.node_id}] COMMIT {txn_id}")
del self.prepared_txns[txn_id]
def abort(self, txn_id: str):
"""Phase 2: Abort transaction"""
if txn_id in self.prepared_txns:
print(f" [{self.node_id}] ABORT {txn_id}")
del self.prepared_txns[txn_id]
class TwoPhaseCommitCoordinator:
"""2PC Coordinator"""
def __init__(self, participants: List[Participant]):
self.participants = participants
def execute_transaction(self, txn_id: str) -> bool:
"""
Execute distributed transaction with 2PC.
Returns: True if committed, False if aborted
"""
print(f"\n=== 2PC Transaction {txn_id} ===")
# Phase 1: PREPARE
print("Phase 1: PREPARE")
votes = []
for p in self.participants:
vote = p.prepare(txn_id)
votes.append(vote)
# Decision: commit only if ALL voted YES
all_yes = all(votes)
# Phase 2: COMMIT or ABORT
if all_yes:
print("\nPhase 2: COMMIT (all voted YES)")
for p in self.participants:
p.commit(txn_id)
return True
else:
print("\nPhase 2: ABORT (at least one voted NO)")
for p in self.participants:
p.abort(txn_id)
return False
# Example: 2PC
print("=== Two-Phase Commit Example ===")
participants = [
Participant("db1"),
Participant("db2"),
Participant("db3")
]
coordinator = TwoPhaseCommitCoordinator(participants)
# Successful transaction
success = coordinator.execute_transaction("txn-001")
print(f"\nResult: {'COMMITTED' if success else 'ABORTED'}")
2PC Problems:
Blocking: If coordinator crashes after PREPARE, participants stuck waiting
Single point of failure: Coordinator failure blocks all
Performance: 2 network round-trips + locks held during both phases
Modern Alternative: Saga pattern (compensating transactions)
Blocking: If coordinator crashes after PREPARE, participants stuck waiting
Single point of failure: Coordinator failure blocks all
Performance: 2 network round-trips + locks held during both phases
Modern Alternative: Saga pattern (compensating transactions)
4. Saga Pattern - Long-Running Transactions
Coordinate distributed transactions with compensating actions.
from typing import Callable, List
from dataclasses import dataclass
@dataclass
class SagaStep:
"""Single step in saga"""
name: str
action: Callable # Forward action
compensate: Callable # Rollback action
class Saga:
"""
Saga Pattern for distributed transactions.
vs 2PC:
- No locks (better availability)
- Eventual consistency (not immediate)
- Uses compensating transactions for rollback
Example: E-commerce order
1. Reserve inventory
2. Charge payment
3. Create shipment
If step 3 fails:
- Compensate step 2 (refund payment)
- Compensate step 1 (release inventory)
"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(self, name: str, action: Callable, compensate: Callable):
"""Add step to saga"""
self.steps.append(SagaStep(name, action, compensate))
def execute(self) -> bool:
"""
Execute saga.
Returns: True if all steps succeeded, False if rolled back
"""
print("=== Executing Saga ===")
for step in self.steps:
try:
print(f"\n[STEP] {step.name}")
step.action()
self.completed_steps.append(step)
except Exception as e:
print(f"\n[FAILED] {step.name}: {e}")
print("\n=== Rolling Back ===")
self._compensate()
return False
print("\n=== Saga Completed Successfully ===")
return True
def _compensate(self):
"""Rollback completed steps in reverse order"""
for step in reversed(self.completed_steps):
print(f"[COMPENSATE] {step.name}")
try:
step.compensate()
except Exception as e:
print(f" ERROR during compensation: {e}")
# Example: E-commerce order saga
print("\n=== Saga Pattern Example ===\n")
# State
inventory_reserved = False
payment_charged = False
shipment_created = False
# Step 1: Reserve inventory
def reserve_inventory():
global inventory_reserved
print(" Reserving inventory...")
inventory_reserved = True
def release_inventory():
global inventory_reserved
print(" Releasing inventory...")
inventory_reserved = False
# Step 2: Charge payment
def charge_payment():
global payment_charged
print(" Charging payment...")
payment_charged = True
def refund_payment():
global payment_charged
print(" Refunding payment...")
payment_charged = False
# Step 3: Create shipment (simulate failure)
def create_shipment():
global shipment_created
print(" Creating shipment...")
raise Exception("Shipment service unavailable") # Simulated failure
def cancel_shipment():
global shipment_created
print(" Canceling shipment...")
shipment_created = False
# Execute saga
saga = Saga()
saga.add_step("Reserve Inventory", reserve_inventory, release_inventory)
saga.add_step("Charge Payment", charge_payment, refund_payment)
saga.add_step("Create Shipment", create_shipment, cancel_shipment)
success = saga.execute()
print(f"\n--- Final State ---")
print(f"Inventory reserved: {inventory_reserved}")
print(f"Payment charged: {payment_charged}")
print(f"Shipment created: {shipment_created}")
print(f"\nAll compensated: {not any([inventory_reserved, payment_charged, shipment_created])}")
When to Use Each Pattern:
2PC: Strong consistency required, short transactions, low latency network
Saga: Long-running workflows, microservices, eventual consistency OK
CRDTs: High availability required, conflict resolution automated
Vector Clocks: Need to detect conflicts, manual resolution
2PC: Strong consistency required, short transactions, low latency network
Saga: Long-running workflows, microservices, eventual consistency OK
CRDTs: High availability required, conflict resolution automated
Vector Clocks: Need to detect conflicts, manual resolution
Key Takeaways
| Pattern | Consistency | Availability | Use Case |
|---|---|---|---|
| Vector Clocks | Detect conflicts | High | Dynamo, Riak |
| CRDTs | Eventual | Highest | Collaborative editing, counters |
| 2PC | Strong | Low (blocking) | Traditional databases |
| Saga | Eventual | High | Microservices, long workflows |