Purpose: Understand how production databases like PostgreSQL, MySQL, and MongoDB work internally. Essential knowledge for Distinguished Engineers designing data-intensive systems.
1. B-Tree and B+ Tree - Database Indexes
The fundamental data structure behind virtually all database indexes.
Why B-Trees?
Problem: Binary search trees are O(log n) for RAM, but databases live on disk.
Disk Access: ~10ms per seek vs ~100ns for RAM = 100,000x slower!
Solution: B-Trees minimize disk seeks by storing many keys per node.
Disk Access: ~10ms per seek vs ~100ns for RAM = 100,000x slower!
Solution: B-Trees minimize disk seeks by storing many keys per node.
B-Tree Properties
- Order M: Each node has at most M children
- Balanced: All leaf nodes at same depth
- Keys per node: Minimum ⌈M/2⌉ - 1 keys (except root)
- Disk-friendly: Node size = disk page size (4KB-16KB)
graph TD
Root["[50, 100]"] --> N1["[20, 30, 40]"]
Root --> N2["[60, 70, 80]"]
Root --> N3["[110, 120]"]
N1 --> L1["10-19"]
N1 --> L2["20-29"]
N1 --> L3["30-39"]
N1 --> L4["40-49"]
N2 --> L5["50-59"]
N2 --> L6["60-69"]
N2 --> L7["70-79"]
N2 --> L8["80-99"]
N3 --> L9["100-109"]
N3 --> L10["110-119"]
N3 --> L11["120+"]
style Root fill:#5e81ac,color:#fff
style N1 fill:#a3be8c,color:#2e3440
style N2 fill:#a3be8c,color:#2e3440
style N3 fill:#a3be8c,color:#2e3440
B+ Tree (Most Common)
Difference from B-Tree:
- All data stored in leaf nodes (internal nodes only have keys)
- Leaf nodes linked together (enables range scans)
- Used by: PostgreSQL, MySQL InnoDB, SQLite
- All data stored in leaf nodes (internal nodes only have keys)
- Leaf nodes linked together (enables range scans)
- Used by: PostgreSQL, MySQL InnoDB, SQLite
from typing import Optional, List, Any
from dataclasses import dataclass
@dataclass
class BTreeNode:
"""B+ Tree node for database index"""
keys: List[int]
children: List['BTreeNode']
values: List[Any] # Only populated in leaf nodes
is_leaf: bool
next_leaf: Optional['BTreeNode'] = None # Link to next leaf (for range scans)
class BPlusTree:
"""
B+ Tree implementation for database indexing.
Properties:
- All data in leaf nodes
- Internal nodes only store keys for navigation
- Leaf nodes linked (enables efficient range queries)
Time Complexity:
- Search: O(log n)
- Insert: O(log n)
- Range query: O(log n + k) where k = results
Space: O(n)
Real-world: PostgreSQL uses B+ Trees for all indexes
"""
def __init__(self, order: int = 4):
"""
order: Maximum number of children per node
For disk: order = page_size / (key_size + pointer_size)
Example: 4KB page, 8-byte key, 8-byte pointer = 4096/16 = 256 children
"""
self.order = order
self.root = BTreeNode(keys=[], children=[], values=[], is_leaf=True)
def search(self, key: int) -> Optional[Any]:
"""Search for key in B+ Tree"""
return self._search_recursive(self.root, key)
def _search_recursive(self, node: BTreeNode, key: int) -> Optional[Any]:
"""Recursive search"""
if node.is_leaf:
# Leaf node: linear search in keys
for i, k in enumerate(node.keys):
if k == key:
return node.values[i]
return None
# Internal node: find correct child
i = 0
while i < len(node.keys) and key >= node.keys[i]:
i += 1
return self._search_recursive(node.children[i], key)
def range_query(self, start_key: int, end_key: int) -> List[tuple]:
"""
Range query: Find all keys in [start_key, end_key].
This is why databases love B+ Trees!
1. Navigate to start_key (O(log n))
2. Follow leaf links until end_key (O(k))
"""
results = []
# Find starting leaf
leaf = self._find_leaf(self.root, start_key)
# Scan through linked leaves
while leaf:
for i, key in enumerate(leaf.keys):
if start_key <= key <= end_key:
results.append((key, leaf.values[i]))
elif key > end_key:
return results # Done
leaf = leaf.next_leaf # Move to next leaf
return results
def _find_leaf(self, node: BTreeNode, key: int) -> BTreeNode:
"""Navigate to leaf that would contain key"""
if node.is_leaf:
return node
# Find correct child
i = 0
while i < len(node.keys) and key >= node.keys[i]:
i += 1
return self._find_leaf(node.children[i], key)
def insert(self, key: int, value: Any):
"""Insert key-value pair"""
root = self.root
# If root is full, split it
if len(root.keys) >= self.order - 1:
new_root = BTreeNode(keys=[], children=[root], values=[], is_leaf=False)
self._split_child(new_root, 0)
self.root = new_root
self._insert_non_full(self.root, key, value)
def _insert_non_full(self, node: BTreeNode, key: int, value: Any):
"""Insert into node that's not full"""
if node.is_leaf:
# Insert into sorted position
i = 0
while i < len(node.keys) and key > node.keys[i]:
i += 1
node.keys.insert(i, key)
node.values.insert(i, value)
else:
# Find child to insert into
i = 0
while i < len(node.keys) and key > node.keys[i]:
i += 1
# Split child if full
if len(node.children[i].keys) >= self.order - 1:
self._split_child(node, i)
if key > node.keys[i]:
i += 1
self._insert_non_full(node.children[i], key, value)
def _split_child(self, parent: BTreeNode, index: int):
"""Split full child node"""
order = self.order
child = parent.children[index]
mid = (order - 1) // 2
# Create new node with right half
new_node = BTreeNode(
keys=child.keys[mid+1:],
children=child.children[mid+1:] if not child.is_leaf else [],
values=child.values[mid+1:] if child.is_leaf else [],
is_leaf=child.is_leaf
)
# Update links for leaf nodes
if child.is_leaf:
new_node.next_leaf = child.next_leaf
child.next_leaf = new_node
# Keep left half in original node
child.keys = child.keys[:mid]
child.children = child.children[:mid+1] if not child.is_leaf else []
child.values = child.values[:mid] if child.is_leaf else []
# Insert middle key into parent
parent.keys.insert(index, child.keys[mid] if not child.is_leaf else new_node.keys[0])
parent.children.insert(index + 1, new_node)
# Example: Database index simulation
print("=== B+ Tree Example ===")
btree = BPlusTree(order=4)
# Insert records (simulating database rows)
data = [(10, "User A"), (20, "User B"), (5, "User C"),
(15, "User D"), (25, "User E"), (30, "User F")]
for key, value in data:
btree.insert(key, value)
# Point query
print("Search for key 15:")
result = btree.search(15)
print(f" Found: {result}")
# Range query (this is what makes B+ Trees perfect for databases!)
print("\nRange query [10, 25]:")
results = btree.range_query(10, 25)
for key, value in results:
print(f" Key {key}: {value}")
B+ Tree vs Hash Index:
Winner: B+ Tree for databases (range queries essential)
| Operation | B+ Tree | Hash Index |
|---|---|---|
| Point query | O(log n) | O(1) |
| Range query | O(log n + k) | O(n) - must scan all! |
| ORDER BY | Free (already sorted) | O(n log n) |
| Prefix search | Supported | Not supported |
2. MVCC - Multi-Version Concurrency Control
How PostgreSQL allows readers and writers to work simultaneously without blocking.
The Problem: Lock-Based Concurrency
Traditional Approach (MySQL MyISAM):
- Writers block readers
- Readers block writers
- Result: Poor concurrency, lots of waiting
- Writers block readers
- Readers block writers
- Result: Poor concurrency, lots of waiting
MVCC Solution
Key Idea: Keep multiple versions of each row.
- Writers create new versions (don't modify in-place)
- Readers see snapshot at their transaction start time
- No locking between readers and writers!
- Writers create new versions (don't modify in-place)
- Readers see snapshot at their transaction start time
- No locking between readers and writers!
sequenceDiagram
participant T1 as Transaction 1 (xid=100)
participant T2 as Transaction 2 (xid=101)
participant DB as Database
T1->>DB: BEGIN (snapshot: xid=100)
T1->>DB: SELECT balance WHERE id=1
DB->>T1: balance=1000 (xmin=95, xmax=inf)
T2->>DB: BEGIN (snapshot: xid=101)
T2->>DB: UPDATE balance=500 WHERE id=1
Note right of DB: Creates new version:
xmin=101, xmax=inf
Old version: xmin=95, xmax=101 T2->>DB: COMMIT T1->>DB: SELECT balance WHERE id=1 DB->>T1: balance=1000 (still sees old version!) Note right of T1: T1 started at xid=100,
can't see xid=101 T1->>DB: COMMIT
xmin=101, xmax=inf
Old version: xmin=95, xmax=101 T2->>DB: COMMIT T1->>DB: SELECT balance WHERE id=1 DB->>T1: balance=1000 (still sees old version!) Note right of T1: T1 started at xid=100,
can't see xid=101 T1->>DB: COMMIT
from typing import Optional, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class RowVersion:
"""A single version of a database row"""
data: dict
xmin: int # Transaction ID that created this version
xmax: int # Transaction ID that deleted/updated this version (inf if current)
class MVCCTable:
"""
Simplified MVCC implementation (PostgreSQL-style).
Key concepts:
- Each row can have multiple versions
- Each version tagged with creating transaction (xmin)
- When updated, old version marked with deleting transaction (xmax)
- Transactions see snapshot based on their start time
This is how PostgreSQL achieves high concurrency!
"""
def __init__(self):
# row_id -> list of versions (sorted by xmin)
self.versions: dict[int, List[RowVersion]] = {}
self.next_xid = 1 # Transaction ID counter
self.active_transactions: dict[int, int] = {} # xid -> snapshot_xid
def begin_transaction(self) -> int:
"""Start a new transaction, return transaction ID"""
xid = self.next_xid
self.next_xid += 1
# Snapshot = all transactions < this xid are "in the past"
self.active_transactions[xid] = xid
print(f"[TX {xid}] BEGIN (snapshot: xid < {xid})")
return xid
def commit_transaction(self, xid: int):
"""Commit transaction"""
if xid in self.active_transactions:
del self.active_transactions[xid]
print(f"[TX {xid}] COMMIT")
def insert(self, xid: int, row_id: int, data: dict):
"""Insert new row"""
version = RowVersion(data=data, xmin=xid, xmax=float('inf'))
if row_id not in self.versions:
self.versions[row_id] = []
self.versions[row_id].append(version)
print(f"[TX {xid}] INSERT row {row_id}: {data}")
def update(self, xid: int, row_id: int, new_data: dict):
"""
Update row (MVCC-style).
Process:
1. Mark current version as deleted (set xmax)
2. Insert new version (with xmin = current xid)
"""
if row_id not in self.versions:
print(f"[TX {xid}] ERROR: Row {row_id} not found")
return
# Find current version (xmax = inf)
for version in self.versions[row_id]:
if version.xmax == float('inf'):
# Mark as deleted by this transaction
version.xmax = xid
break
# Create new version
new_version = RowVersion(data=new_data, xmin=xid, xmax=float('inf'))
self.versions[row_id].append(new_version)
print(f"[TX {xid}] UPDATE row {row_id}: {new_data}")
def select(self, xid: int, row_id: int) -> Optional[dict]:
"""
Select row visible to this transaction.
Visibility rules:
- xmin < snapshot_xid (created before our snapshot)
- xmax > snapshot_xid or xmax = inf (not deleted in our snapshot)
"""
if row_id not in self.versions:
return None
snapshot_xid = self.active_transactions.get(xid, xid)
# Find visible version
for version in self.versions[row_id]:
# Created before our snapshot?
if version.xmin >= snapshot_xid:
continue
# Still alive in our snapshot?
if version.xmax == float('inf') or version.xmax >= snapshot_xid:
print(f"[TX {xid}] SELECT row {row_id}: {version.data} " +
f"(xmin={version.xmin}, xmax={version.xmax})")
return version.data
print(f"[TX {xid}] SELECT row {row_id}: NOT FOUND (no visible version)")
return None
# Example: PostgreSQL-style MVCC
print("\n=== MVCC Example ===")
table = MVCCTable()
# Initial data
tx0 = table.begin_transaction()
table.insert(tx0, row_id=1, data={"balance": 1000})
table.commit_transaction(tx0)
# Transaction 1: Long-running query
tx1 = table.begin_transaction()
result = table.select(tx1, row_id=1)
print(f" -> Balance: {result['balance']}\n")
# Transaction 2: Update balance
tx2 = table.begin_transaction()
table.update(tx2, row_id=1, new_data={"balance": 500})
table.commit_transaction(tx2)
print()
# Transaction 1: Still sees old version!
result = table.select(tx1, row_id=1)
print(f" -> Balance: {result['balance']} (MVCC isolation!)\n")
table.commit_transaction(tx1)
# Transaction 3: Sees new version
tx3 = table.begin_transaction()
result = table.select(tx3, row_id=1)
print(f" -> Balance: {result['balance']}")
table.commit_transaction(tx3)
MVCC Benefits:
Readers never block writers
Writers never block readers
Consistent snapshots (repeatable reads)
High concurrency
Cost:
Multiple versions consume space
Need VACUUM to clean up old versions (PostgreSQL)
Write amplification (creates new version on every update)
Readers never block writers
Writers never block readers
Consistent snapshots (repeatable reads)
High concurrency
Cost:
Multiple versions consume space
Need VACUUM to clean up old versions (PostgreSQL)
Write amplification (creates new version on every update)
3. Write-Ahead Logging (WAL)
How databases ensure durability - your data survives crashes.
The Durability Problem
Naive Approach: Write directly to data files
Problem: Crash mid-write leads to corrupted data!
Example: Transfer $100 (debit account A, credit account B)
Crash after debit before credit leads to $100 vanishing!
Problem: Crash mid-write leads to corrupted data!
Example: Transfer $100 (debit account A, credit account B)
Crash after debit before credit leads to $100 vanishing!
WAL Solution
Rule: Write to log BEFORE modifying data
1. Write change to append-only log (fast, sequential)
2. Acknowledge to client ("committed")
3. Apply changes to data files (async, can replay from log if crash)
1. Write change to append-only log (fast, sequential)
2. Acknowledge to client ("committed")
3. Apply changes to data files (async, can replay from log if crash)
import os
import json
from typing import Dict, Any
from dataclasses import dataclass, asdict
@dataclass
class WALEntry:
"""Write-Ahead Log entry"""
lsn: int # Log Sequence Number (monotonically increasing)
operation: str # "INSERT", "UPDATE", "DELETE"
table: str
row_id: int
data: dict
class WriteAheadLog:
"""
Simplified Write-Ahead Logging (PostgreSQL-style).
Properties:
- All changes written to log BEFORE data files
- Log is append-only (fast, sequential writes)
- Can replay log to recover from crashes
- Periodic checkpoints flush dirty pages to disk
This is how databases guarantee durability (the D in ACID)!
"""
def __init__(self, log_file: str = "wal.log", data_file: str = "data.json"):
self.log_file = log_file
self.data_file = data_file
self.lsn = 0 # Log Sequence Number
self.data: Dict[str, Dict[int, dict]] = {} # table -> row_id -> data
self.dirty_pages = set() # Pages modified but not flushed to disk
# Load existing data
if os.path.exists(data_file):
with open(data_file, 'r') as f:
self.data = json.load(f)
# Replay WAL if exists (crash recovery)
if os.path.exists(log_file):
print("=== Crash Recovery: Replaying WAL ===")
self._replay_wal()
def _write_log(self, entry: WALEntry):
"""Append entry to WAL (durable write)"""
with open(self.log_file, 'a') as f:
f.write(json.dumps(asdict(entry)) + '\n')
f.flush() # Force OS to write to disk
os.fsync(f.fileno()) # Ensure disk write completed
print(f"[WAL] LSN={entry.lsn} {entry.operation} {entry.table}#{entry.row_id}")
def _apply_to_memory(self, entry: WALEntry):
"""Apply change to in-memory data"""
table = entry.table
row_id = entry.row_id
if table not in self.data:
self.data[table] = {}
if entry.operation == "INSERT" or entry.operation == "UPDATE":
self.data[table][row_id] = entry.data
elif entry.operation == "DELETE":
if row_id in self.data[table]:
del self.data[table][row_id]
self.dirty_pages.add((table, row_id))
def insert(self, table: str, row_id: int, data: dict):
"""Insert with WAL"""
# 1. Write to WAL first (MUST complete before acknowledging)
entry = WALEntry(lsn=self.lsn, operation="INSERT",
table=table, row_id=row_id, data=data)
self.lsn += 1
self._write_log(entry)
# 2. Apply to in-memory data
self._apply_to_memory(entry)
print(f" -> Insert committed (durable via WAL)")
def update(self, table: str, row_id: int, data: dict):
"""Update with WAL"""
entry = WALEntry(lsn=self.lsn, operation="UPDATE",
table=table, row_id=row_id, data=data)
self.lsn += 1
self._write_log(entry)
self._apply_to_memory(entry)
print(f" -> Update committed (durable via WAL)")
def checkpoint(self):
"""
Checkpoint: Flush dirty pages to disk.
After checkpoint, can truncate WAL (recovery only needs changes since last checkpoint).
"""
print("\n=== Checkpoint: Flushing dirty pages to disk ===")
# Write data file
with open(self.data_file, 'w') as f:
json.dump(self.data, f, indent=2)
f.flush()
os.fsync(f.fileno())
self.dirty_pages.clear()
# In real DB: can now truncate WAL (keep only recent entries)
print(f" -> Checkpoint complete. Data file synced to disk.\n")
def _replay_wal(self):
"""Replay WAL for crash recovery"""
with open(self.log_file, 'r') as f:
for line in f:
entry_dict = json.loads(line.strip())
entry = WALEntry(**entry_dict)
print(f" Replaying LSN={entry.lsn} {entry.operation}")
self._apply_to_memory(entry)
self.lsn = max(self.lsn, entry.lsn + 1)
print(f" -> Recovery complete. Replayed to LSN={self.lsn}\n")
# Example: WAL in action
print("\n=== Write-Ahead Logging Example ===")
# Clean up old files
for f in ["wal.log", "data.json"]:
if os.path.exists(f):
os.remove(f)
db = WriteAheadLog()
# Transactions
db.insert("users", 1, {"name": "Alice", "balance": 1000})
db.insert("users", 2, {"name": "Bob", "balance": 500})
# Checkpoint
db.checkpoint()
# More changes
db.update("users", 1, {"name": "Alice", "balance": 900})
db.update("users", 2, {"name": "Bob", "balance": 600})
# Simulate crash (no checkpoint!)
print("=== Simulating crash... ===\n")
# Restart database - WAL replay will restore state
db2 = WriteAheadLog() # Automatically replays WAL
print("After recovery:")
print(f" User 1: {db2.data['users'][1]}")
print(f" User 2: {db2.data['users'][2]}")
# Clean up
os.remove("wal.log")
os.remove("data.json")
WAL Performance:
- Append-only writes: Sequential I/O ~100MB/s (vs random ~5MB/s)
- Fsync every commit: ~1000 commits/sec (can batch for higher throughput)
- Checkpoint interval: Balance between recovery time and write overhead
- PostgreSQL: Checkpoints every 5 minutes or when WAL reaches size limit
- Append-only writes: Sequential I/O ~100MB/s (vs random ~5MB/s)
- Fsync every commit: ~1000 commits/sec (can batch for higher throughput)
- Checkpoint interval: Balance between recovery time and write overhead
- PostgreSQL: Checkpoints every 5 minutes or when WAL reaches size limit
4. Database Replication
How databases achieve high availability and read scalability.
Replication Strategies
| Strategy | Description | Use Case | Consistency |
|---|---|---|---|
| Streaming Replication | Stream WAL from primary to replicas | PostgreSQL, MySQL | Async (eventual) or Sync (strong) |
| Logical Replication | Replicate logical changes (SQL statements) | Cross-version, selective tables | Eventual |
| Statement-Based | Replay SQL statements on replicas | MySQL (old) | Can diverge (non-deterministic functions) |
| Row-Based | Replicate actual row changes | MySQL (default now) | Exact replication |
import time
import threading
from typing import List
from queue import Queue
class ReplicationLog:
"""
Simplified database replication (PostgreSQL-style streaming replication).
Architecture:
- Primary: Accepts writes, appends to WAL
- Replicas: Stream WAL from primary, apply changes
Modes:
- Async: Primary doesn't wait for replica acknowledgment (fast, eventual consistency)
- Sync: Primary waits for replica (slower, strong consistency)
"""
def __init__(self, node_id: str, is_primary: bool = False):
self.node_id = node_id
self.is_primary = is_primary
self.wal: List[dict] = []
self.data: dict = {}
self.lsn = 0
self.replica_queues: List[Queue] = [] # For async replication
def add_replica(self, replica_queue: Queue):
"""Register a replica to receive WAL stream"""
self.replica_queues.append(replica_queue)
def write(self, key: str, value: Any, sync: bool = False):
"""
Write to primary (with replication).
sync=False: Async replication (don't wait for replicas)
sync=True: Sync replication (wait for at least 1 replica)
"""
if not self.is_primary:
print(f"[{self.node_id}] ERROR: Cannot write to replica")
return
# Create WAL entry
entry = {
"lsn": self.lsn,
"operation": "UPDATE",
"key": key,
"value": value,
"timestamp": time.time()
}
self.lsn += 1
# Apply locally
self.wal.append(entry)
self.data[key] = value
print(f"[{self.node_id}] WRITE {key}={value} (LSN={entry['lsn']})")
# Replicate to followers
for queue in self.replica_queues:
queue.put(entry)
if sync and self.replica_queues:
# Sync mode: wait for acknowledgment (simplified - wait 100ms)
time.sleep(0.1)
print(f" -> Sync replication: Waited for replica acknowledgment")
def read(self, key: str) -> Any:
"""Read from node (can be primary or replica)"""
value = self.data.get(key, "NOT_FOUND")
print(f"[{self.node_id}] READ {key}={value}")
return value
def start_replication(self, primary_queue: Queue):
"""Start replication thread (for replica nodes)"""
if self.is_primary:
return
def replication_loop():
while True:
entry = primary_queue.get()
# Apply WAL entry
self.wal.append(entry)
self.data[entry['key']] = entry['value']
self.lsn = entry['lsn'] + 1
print(f"[{self.node_id}] REPLICATED {entry['key']}={entry['value']} (LSN={entry['lsn']})")
thread = threading.Thread(target=replication_loop, daemon=True)
thread.start()
# Example: PostgreSQL-style streaming replication
print("\n=== Database Replication Example ===")
# Create primary and 2 replicas
primary = ReplicationLog("PRIMARY", is_primary=True)
replica1 = ReplicationLog("REPLICA-1")
replica2 = ReplicationLog("REPLICA-2")
# Setup replication streams
queue1 = Queue()
queue2 = Queue()
primary.add_replica(queue1)
primary.add_replica(queue2)
replica1.start_replication(queue1)
replica2.start_replication(queue2)
# Write to primary
primary.write("user:1:balance", 1000)
primary.write("user:2:balance", 500)
# Wait for replication
time.sleep(0.2)
# Read from replica (eventual consistency)
print("\n--- Reading from replica ---")
replica1.read("user:1:balance")
replica2.read("user:2:balance")
# Update on primary
print("\n--- Update on primary ---")
primary.write("user:1:balance", 900)
time.sleep(0.2)
# Read from replicas (should see update)
print("\n--- Reading updated value from replicas ---")
replica1.read("user:1:balance")
replica2.read("user:1:balance")
Replication Trade-offs:
Async Replication (Default):
Fast writes (no waiting)
High availability (primary doesn't depend on replicas)
Replication lag (replicas may be behind)
Data loss possible (if primary crashes before replication)
Sync Replication:
Strong consistency (replica always up-to-date)
No data loss (committed = replicated)
Slower writes (wait for network + replica)
Availability impact (primary blocked if replica down)
Async Replication (Default):
Fast writes (no waiting)
High availability (primary doesn't depend on replicas)
Replication lag (replicas may be behind)
Data loss possible (if primary crashes before replication)
Sync Replication:
Strong consistency (replica always up-to-date)
No data loss (committed = replicated)
Slower writes (wait for network + replica)
Availability impact (primary blocked if replica down)
Key Takeaways
B+ Trees: Minimize disk I/O, enable efficient range queries
MVCC: Readers and writers don't block each other - high concurrency
WAL: Write to log first - durability despite crashes
Replication: Async for performance, sync for consistency
Back to Study Guide
MVCC: Readers and writers don't block each other - high concurrency
WAL: Write to log first - durability despite crashes
Replication: Async for performance, sync for consistency