Cache Patterns, Invalidation, and Performance Optimization
| Level | Location | Latency | Size | Use Case |
|---|---|---|---|---|
| L1 Cache | CPU core | ~1 ns | 32-64 KB | CPU registers, instructions |
| L2 Cache | CPU core | ~3-10 ns | 256 KB - 1 MB | Recently accessed data |
| L3 Cache | CPU package | ~10-20 ns | 8-64 MB | Shared across cores |
| RAM | System memory | ~100 ns | 8-128 GB | Active application data |
| Application Cache | Redis/Memcached | ~1 ms | 1-100 GB | Hot data, sessions, computed results |
| CDN Cache | Edge servers | ~10-50 ms | 1-10 TB | Static assets, images, videos |
| Database | Disk + DB cache | ~10-100 ms | 100 GB - 10 TB | Persistent storage |
The pattern names describe where the cache sits relative to the write path or when operations happen:
Write flows THROUGH the cache to DB (inline, synchronous)
DB write happens BEHIND (after) the cache write
Cache is ASIDE from the main path, app manages it manually
Read flows THROUGH the cache (cache auto-fetches on miss)
| Pattern | Preposition Meaning | Cache Position |
|---|---|---|
| Write-Through | Data passes through the cache | Inline, synchronous gatekeeper |
| Write-Behind | DB write comes behind (after) | Cache first, DB catches up later |
| Write-Back | Data written back to DB | Same as write-behind (alias) |
| Cache-Aside | Cache is to the side | Out of band, app manages explicitly |
| Read-Through | Reads pass through cache | Cache auto-fetches on miss |
| Refresh-Ahead | Refresh ahead of expiry | Proactive, before TTL runs out |
| Pattern | Read Latency | Write Latency | Consistency | Best For |
|---|---|---|---|---|
| Cache-Aside | Miss penalty | Fast (DB only) | Eventual | General purpose, read-heavy |
| Write-Through | Always fast | Slow (2 writes) | Strong | Data integrity critical |
| Write-Behind | Always fast | Very fast | Eventual | Write-heavy, can tolerate loss |
| Refresh-Ahead | Always fast (hot data) | N/A | Near real-time | Predictable hot data |
import redis
import json
from typing import Optional, Any
from functools import wraps
class CacheAside:
"""
Cache-Aside (Lazy Loading) Pattern
Pros:
+ Only caches what's actually requested (no waste)
+ Resilient (cache failure doesn't break app)
+ Works with any data store
Cons:
- Cache miss penalty (3 round trips: cache check, DB read, cache write)
- Stale data possible (until TTL expires)
- Cache stampede risk
"""
def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
self.cache = redis_client
self.ttl = ttl
def get_user(self, user_id: int) -> Optional[dict]:
"""
Read with cache-aside pattern
"""
cache_key = f"user:{user_id}"
# 1. Try cache first
cached = self.cache.get(cache_key)
if cached:
print(f"Cache HIT for {cache_key}")
return json.loads(cached)
# 2. Cache miss - fetch from database
print(f"Cache MISS for {cache_key}")
user = self._fetch_from_db(user_id)
if user:
# 3. Store in cache for future requests
self.cache.setex(
cache_key,
self.ttl,
json.dumps(user)
)
return user
def update_user(self, user_id: int, updates: dict):
"""
Write with cache invalidation
"""
# 1. Update database
self._update_db(user_id, updates)
# 2. Invalidate cache (let next read repopulate)
cache_key = f"user:{user_id}"
self.cache.delete(cache_key)
# Alternative: Update cache immediately
# self.cache.setex(cache_key, self.ttl, json.dumps(updated_user))
def _fetch_from_db(self, user_id: int) -> Optional[dict]:
"""Simulate database fetch"""
# In production: SELECT * FROM users WHERE id = user_id
return {"id": user_id, "name": "John Doe", "email": "john@example.com"}
def _update_db(self, user_id: int, updates: dict):
"""Simulate database update"""
# In production: UPDATE users SET ... WHERE id = user_id
pass
# Decorator pattern for caching
def cache_aside(cache_key_fn, ttl=3600):
"""
Decorator for cache-aside pattern
Usage:
@cache_aside(lambda user_id: f"user:{user_id}", ttl=3600)
def get_user(user_id):
return db.query(f"SELECT * FROM users WHERE id = {user_id}")
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache = redis.Redis(host='localhost', port=6379, db=0)
# Generate cache key
cache_key = cache_key_fn(*args, **kwargs)
# Try cache
cached = cache.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - call function
result = func(*args, **kwargs)
# Store in cache
if result is not None:
cache.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# Usage
@cache_aside(lambda user_id: f"user:{user_id}", ttl=3600)
def get_user(user_id: int):
"""This function is now cached"""
print(f"Fetching user {user_id} from database...")
return {"id": user_id, "name": "John Doe"}
class WriteThroughCache:
"""
Write-Through Cache Pattern
Pros:
+ Cache always consistent with database
+ No stale data
+ Read latency always low (data always in cache)
Cons:
- Write latency higher (two writes)
- Wasted cache space (caches everything, even rarely accessed)
- Cache failure breaks writes
"""
def __init__(self, redis_client: redis.Redis):
self.cache = redis_client
def create_user(self, user_id: int, user_data: dict):
"""
Write-through: Write to cache AND database
Both writes must succeed or rollback (2PC pattern)
"""
try:
# 1. Write to database first
self._write_to_db(user_id, user_data)
# 2. Write to cache (synchronously)
cache_key = f"user:{user_id}"
self.cache.setex(
cache_key,
3600,
json.dumps(user_data)
)
except Exception as e:
# If cache write fails, rollback DB write
self._delete_from_db(user_id)
raise Exception(f"Write-through failed: {e}. Rolled back DB write.")
def update_user(self, user_id: int, updates: dict):
"""
Update in both cache and database
Both writes must succeed or rollback
"""
# Get original data for rollback
original_user = self._get_from_db(user_id)
try:
# 1. Update database
updated_user = self._update_db(user_id, updates)
# 2. Update cache
cache_key = f"user:{user_id}"
self.cache.setex(
cache_key,
3600,
json.dumps(updated_user)
)
except Exception as e:
# If cache write fails, rollback DB update
if original_user:
self._update_db(user_id, original_user)
raise Exception(f"Write-through update failed: {e}. Rolled back DB update.")
def get_user(self, user_id: int) -> Optional[dict]:
"""
Read is simple - always in cache
"""
cache_key = f"user:{user_id}"
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss should be rare (only on cold start)
return None
def _write_to_db(self, user_id: int, user_data: dict):
"""Database write"""
# In production: INSERT INTO users (id, ...) VALUES (user_id, ...)
pass
def _update_db(self, user_id: int, updates: dict) -> dict:
"""Database update, returns updated record"""
# In production: UPDATE users SET ... WHERE id = user_id
return {"id": user_id, **updates}
def _delete_from_db(self, user_id: int):
"""Database delete (for rollback)"""
# In production: DELETE FROM users WHERE id = user_id
pass
def _get_from_db(self, user_id: int) -> Optional[dict]:
"""Database fetch (for rollback)"""
# In production: SELECT * FROM users WHERE id = user_id
return {"id": user_id, "name": "John Doe"}
import queue
import threading
import time
class WriteBehindCache:
"""
Write-Behind (Write-Back) Cache Pattern
Writes go to cache first, database updated asynchronously
Pros:
+ Very fast writes (cache only)
+ Batch database writes (efficiency)
+ Reduces database load
Cons:
- Data loss risk (cache failure before DB write)
- Complex implementation
- Eventual consistency
"""
def __init__(self, redis_client: redis.Redis, batch_size: int = 100):
self.cache = redis_client
self.write_queue = queue.Queue()
self.batch_size = batch_size
# Start background worker
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
def update_user(self, user_id: int, updates: dict):
"""
Write to cache immediately, queue DB write
"""
# 1. Update cache (fast)
cache_key = f"user:{user_id}"
user_data = {"id": user_id, **updates}
self.cache.setex(cache_key, 3600, json.dumps(user_data))
# 2. Queue database write (asynchronous)
self.write_queue.put(('update', user_id, updates))
def _worker(self):
"""
Background worker processes write queue
"""
batch = []
while True:
try:
# Collect batch of writes
while len(batch) < self.batch_size:
try:
item = self.write_queue.get(timeout=1)
batch.append(item)
except queue.Empty:
break
# Process batch
if batch:
self._flush_batch(batch)
batch = []
except Exception as e:
print(f"Write-behind error: {e}")
def _flush_batch(self, batch: list):
"""
Write batch to database
"""
# Batch database operations for efficiency
# UPDATE users SET ... WHERE id IN (...)
print(f"Flushing {len(batch)} writes to database")
for operation, user_id, updates in batch:
if operation == 'update':
self._update_db(user_id, updates)
def _update_db(self, user_id: int, updates: dict):
"""Database update"""
# Actual database write
pass
import time
from datetime import datetime, timedelta
class RefreshAheadCache:
"""
Refresh-Ahead Cache Pattern
Proactively refresh hot data before expiration
Pros:
+ Always fresh data for popular items
+ No cache miss penalty for hot data
+ Predictable latency
Cons:
- Wasted refreshes for cold data
- Complex to implement
- Needs accurate prediction of hot data
"""
def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
self.cache = redis_client
self.ttl = ttl
self.refresh_threshold = 0.8 # Refresh when 80% of TTL passed
def get_user(self, user_id: int) -> Optional[dict]:
"""
Get with automatic refresh-ahead
"""
cache_key = f"user:{user_id}"
cached = self.cache.get(cache_key)
if cached:
# Check if nearing expiration
ttl_remaining = self.cache.ttl(cache_key)
if ttl_remaining < (self.ttl * (1 - self.refresh_threshold)):
# Refresh in background (async)
threading.Thread(
target=self._refresh_cache,
args=(user_id,)
).start()
return json.loads(cached)
# Cache miss - load and cache
user = self._fetch_from_db(user_id)
if user:
self.cache.setex(cache_key, self.ttl, json.dumps(user))
return user
def _refresh_cache(self, user_id: int):
"""
Background refresh of cache
"""
print(f"Refreshing cache for user:{user_id}")
user = self._fetch_from_db(user_id)
if user:
cache_key = f"user:{user_id}"
self.cache.setex(cache_key, self.ttl, json.dumps(user))
def _fetch_from_db(self, user_id: int) -> Optional[dict]:
"""Database fetch"""
return {"id": user_id, "name": "John Doe"}
| Strategy | How It Works | Pros | Cons |
|---|---|---|---|
| TTL (Time-To-Live) | Cache expires after X seconds | Simple, automatic | Stale data possible, cache stampede |
| Event-Based | Invalidate on write events | Always fresh | Complex, distributed coordination |
| Tag-Based | Group caches by tags, invalidate tag | Bulk invalidation | Complex tag management |
| Versioning | Include version in cache key | No invalidation needed | Memory waste (multiple versions) |
class TTLCache:
"""
TTL (Time-To-Live) based cache expiration
Simplest invalidation strategy
"""
def __init__(self, redis_client: redis.Redis):
self.cache = redis_client
def set_with_ttl(self, key: str, value: Any, ttl: int):
"""
Set cache with TTL
TTL selection guide:
- Static data (configs): 1 hour - 1 day
- User profiles: 5-15 minutes
- API responses: 1-5 minutes
- Real-time data: 10-60 seconds
"""
self.cache.setex(key, ttl, json.dumps(value))
def set_with_adaptive_ttl(self, key: str, value: Any, access_count: int):
"""
Adaptive TTL based on access patterns
Hot data → longer TTL
Cold data → shorter TTL
"""
# More accesses = longer TTL
if access_count > 1000:
ttl = 3600 # 1 hour
elif access_count > 100:
ttl = 1800 # 30 minutes
else:
ttl = 300 # 5 minutes
self.cache.setex(key, ttl, json.dumps(value))
import hashlib
class CacheStampedeProtection:
"""
Prevent cache stampede (thundering herd)
Problem:
- Cache expires
- 1000 requests simultaneously hit DB
- Database overload
- Slow response
Solutions:
1. Lock-based: First request locks, others wait
2. Probabilistic early expiration
3. Background refresh
"""
def __init__(self, redis_client: redis.Redis):
self.cache = redis_client
def get_with_lock(self, key: str, fetch_fn, ttl: int = 3600):
"""
Solution 1: Lock-based stampede prevention
First request gets lock and regenerates cache
Others wait for first request to complete
"""
# Try cache
cached = self.cache.get(key)
if cached:
return json.loads(cached)
# Cache miss - try to acquire lock
lock_key = f"lock:{key}"
lock = self.cache.set(lock_key, "1", nx=True, ex=10) # 10s lock
if lock:
# Got lock - regenerate cache
try:
value = fetch_fn()
self.cache.setex(key, ttl, json.dumps(value))
return value
finally:
# Release lock
self.cache.delete(lock_key)
else:
# Didn't get lock - wait and retry
time.sleep(0.1) # Brief wait
cached = self.cache.get(key)
if cached:
return json.loads(cached)
# Lock holder failed - try again
return self.get_with_lock(key, fetch_fn, ttl)
def get_with_probabilistic_expiration(self, key: str, fetch_fn, ttl: int = 3600):
"""
Solution 2: Probabilistic early expiration (XFetch)
Randomly refresh before TTL expires
Probability increases as expiration approaches
P(refresh) = β * e^(-(TTL_remaining / δ))
Where:
- β: beta, adjusts probability
- δ: delta, time before expiration to start refreshing
"""
cached_data = self.cache.get(key)
if cached_data:
# Check TTL
ttl_remaining = self.cache.ttl(key)
# Calculate refresh probability
delta = ttl * 0.2 # Start considering refresh at 20% TTL remaining
beta = 1.0
if ttl_remaining < delta:
import random
import math
probability = beta * math.exp(-ttl_remaining / delta)
if random.random() < probability:
# Probabilistically refresh
threading.Thread(
target=self._background_refresh,
args=(key, fetch_fn, ttl)
).start()
return json.loads(cached_data)
# Cache miss - normal fetch
value = fetch_fn()
self.cache.setex(key, ttl, json.dumps(value))
return value
def _background_refresh(self, key: str, fetch_fn, ttl: int):
"""Refresh cache in background"""
try:
value = fetch_fn()
self.cache.setex(key, ttl, json.dumps(value))
except Exception as e:
print(f"Background refresh failed: {e}")
class EventBasedInvalidation:
"""
Event-based cache invalidation using Pub/Sub
When data changes, publish invalidation event
All cache instances subscribe and invalidate
"""
def __init__(self, redis_client: redis.Redis):
self.cache = redis_client
self.pubsub = redis_client.pubsub()
# Subscribe to invalidation events
self.pubsub.subscribe('cache:invalidate')
# Start listener thread
self.listener_thread = threading.Thread(
target=self._listen_for_invalidations,
daemon=True
)
self.listener_thread.start()
def update_user(self, user_id: int, updates: dict):
"""
Update user and publish invalidation event
"""
# 1. Update database
self._update_db(user_id, updates)
# 2. Publish invalidation event
self.cache.publish('cache:invalidate', json.dumps({
'type': 'user',
'id': user_id,
'pattern': f'user:{user_id}*' # Invalidate all related caches
}))
def _listen_for_invalidations(self):
"""
Listen for invalidation events
"""
for message in self.pubsub.listen():
if message['type'] == 'message':
event = json.loads(message['data'])
# Invalidate matching cache keys
pattern = event['pattern']
for key in self.cache.scan_iter(match=pattern):
self.cache.delete(key)
print(f"Invalidated cache: {key}")
def _update_db(self, user_id: int, updates: dict):
"""Database update"""
pass
class RedisStringCache:
"""
Redis string operations for caching
Use for: Simple key-value caching
"""
def __init__(self, redis_client: redis.Redis):
self.cache = redis_client
def cache_user(self, user_id: int, user_data: dict):
"""Cache user as JSON string"""
key = f"user:{user_id}"
self.cache.setex(key, 3600, json.dumps(user_data))
def cache_counter(self, key: str):
"""Atomic counter (views, likes, etc.)"""
self.cache.incr(key)
def cache_with_multiple_keys(self, data: dict):
"""Batch set multiple keys (pipeline for efficiency)"""
pipe = self.cache.pipeline()
for key, value in data.items():
pipe.setex(key, 3600, json.dumps(value))
pipe.execute()
class RedisHashCache:
"""
Redis hash operations for structured data
Use for: Objects with multiple fields
Benefits: Update individual fields without fetching entire object
"""
def __init__(self, redis_client: redis.Redis):
self.cache = redis_client
def cache_user_as_hash(self, user_id: int, user_data: dict):
"""
Store user as hash (more efficient for partial updates)
"""
key = f"user:{user_id}"
# Store entire user as hash
self.cache.hset(key, mapping=user_data)
self.cache.expire(key, 3600)
def update_user_field(self, user_id: int, field: str, value: str):
"""
Update single field without fetching entire user
"""
key = f"user:{user_id}"
self.cache.hset(key, field, value)
def get_user_field(self, user_id: int, field: str) -> Optional[str]:
"""
Get single field
"""
key = f"user:{user_id}"
return self.cache.hget(key, field)
def get_user(self, user_id: int) -> dict:
"""
Get all fields as dictionary
"""
key = f"user:{user_id}"
return self.cache.hgetall(key)
class RedisCollectionCache:
"""
Redis list and set operations
Lists: Ordered collections, recent items, queues
Sets: Unique values, tags, followers
"""
def __init__(self, redis_client: redis.Redis):
self.cache = redis_client
def cache_recent_orders(self, user_id: int, order_id: int):
"""
Maintain list of recent orders (FIFO, max 100)
"""
key = f"user:{user_id}:recent_orders"
# Add to front of list
self.cache.lpush(key, order_id)
# Trim to keep only 100 most recent
self.cache.ltrim(key, 0, 99)
# Set expiration
self.cache.expire(key, 86400) # 24 hours
def get_recent_orders(self, user_id: int) -> list:
"""Get recent orders"""
key = f"user:{user_id}:recent_orders"
return self.cache.lrange(key, 0, -1)
def cache_user_tags(self, user_id: int, tags: list):
"""
Store user tags as set (unique values)
"""
key = f"user:{user_id}:tags"
# Add multiple tags
self.cache.sadd(key, *tags)
self.cache.expire(key, 3600)
def get_common_tags(self, user_id1: int, user_id2: int) -> set:
"""
Find common tags between users (set intersection)
"""
key1 = f"user:{user_id1}:tags"
key2 = f"user:{user_id2}:tags"
return self.cache.sinter(key1, key2)
from collections import OrderedDict
class LRUCache:
"""
LRU (Least Recently Used) Cache
Evicts least recently used items when capacity reached
Time Complexity:
- Get: O(1)
- Put: O(1)
- Space: O(capacity)
Implementation: OrderedDict (maintains insertion order)
"""
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key: str) -> Optional[Any]:
"""
Get value and mark as recently used
"""
if key not in self.cache:
return None
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: str, value: Any):
"""
Put value in cache
"""
if key in self.cache:
# Update existing key
self.cache.move_to_end(key)
self.cache[key] = value
# Evict LRU if capacity exceeded
if len(self.cache) > self.capacity:
# Remove first item (least recently used)
self.cache.popitem(last=False)
# LRU with manual implementation (interview question)
class LRUCacheManual:
"""
LRU Cache with Doubly Linked List + HashMap
Interview favorite - implement from scratch
"""
class Node:
def __init__(self, key, value):
self.key = key
self.value = value
self.prev = None
self.next = None
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = {} # key -> Node
# Dummy head and tail for easier manipulation
self.head = self.Node(0, 0)
self.tail = self.Node(0, 0)
self.head.next = self.tail
self.tail.prev = self.head
def get(self, key: str) -> Optional[Any]:
if key not in self.cache:
return None
node = self.cache[key]
# Move to front (most recently used)
self._remove(node)
self._add_to_front(node)
return node.value
def put(self, key: str, value: Any):
if key in self.cache:
# Update existing
self._remove(self.cache[key])
node = self.Node(key, value)
self.cache[key] = node
self._add_to_front(node)
# Evict LRU if needed
if len(self.cache) > self.capacity:
lru = self.tail.prev
self._remove(lru)
del self.cache[lru.key]
def _remove(self, node):
"""Remove node from linked list"""
node.prev.next = node.next
node.next.prev = node.prev
def _add_to_front(self, node):
"""Add node right after head (most recent)"""
node.next = self.head.next
node.prev = self.head
self.head.next.prev = node
self.head.next = node
class CDNCachingStrategy:
"""
CDN (Content Delivery Network) caching
Cache static assets at edge locations near users
Cache-Control headers:
- public: Can be cached by CDN
- private: Only browser cache (not CDN)
- max-age: TTL in seconds
- s-maxage: TTL for shared caches (CDN)
- no-cache: Must revalidate
- no-store: Don't cache at all
"""
@staticmethod
def static_asset_headers():
"""
Headers for static assets (images, CSS, JS)
Long TTL with versioning (cache busting)
"""
return {
'Cache-Control': 'public, max-age=31536000, immutable', # 1 year
'ETag': 'hash-of-content',
# Filename versioning: style.v123.css (cache busting)
}
@staticmethod
def dynamic_content_headers():
"""
Headers for dynamic content that changes
Short TTL with revalidation
"""
return {
'Cache-Control': 'public, max-age=300, must-revalidate', # 5 minutes
'ETag': 'hash-of-content',
'Vary': 'Accept-Encoding, Accept-Language'
}
@staticmethod
def api_response_headers():
"""
Headers for API responses
Separate TTL for browser vs CDN
"""
return {
'Cache-Control': 'public, max-age=60, s-maxage=300', # Browser: 1min, CDN: 5min
'ETag': 'hash-of-response',
'Vary': 'Authorization' # Different responses for different users
}
@staticmethod
def no_cache_headers():
"""
Headers for sensitive/personalized content
"""
return {
'Cache-Control': 'private, no-cache, no-store, must-revalidate',
'Pragma': 'no-cache', # HTTP/1.0
'Expires': '0'
}