Choosing the right data store is one of the most critical architectural decisions. There's no "best" database—only the best fit for your specific requirements.
For Senior/Distinguished Engineers: You should understand CAP theorem implications, consistency models, scaling characteristics, and operational complexity for each system type.
| Use Case | Recommended System | Why |
|---|---|---|
| Transactional data, ACID required | RDBMS (PostgreSQL, MySQL) | Strong consistency, transactions, data integrity |
| Flexible schema, rapid development | Document DB (MongoDB, CouchDB) | Schema flexibility, JSON-like documents |
| High read/write throughput, simple queries | Key-Value Store (Redis, DynamoDB) | Extreme performance, simple data model |
| Time-series data, analytics | Column-Family (Cassandra, HBase) | Write-optimized, wide columns, compression |
| Relationship-heavy data | Graph DB (Neo4j, Amazon Neptune) | Native graph traversal, relationship queries |
| Full-text search | Search Engine (Elasticsearch, Solr) | Inverted indexes, relevance scoring |
| Event streaming, message queue | Message Broker (Kafka, RabbitMQ) | Decoupling, async processing, replay |
| Caching, session storage | Cache (Redis, Memcached) | In-memory speed, TTL support |
Traditional SQL databases with structured schema, ACID transactions, and relational model.
*Single-node. Distributed RDBMS typically CP.
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Numeric
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, nullable=False)
name = Column(String)
orders = relationship("Order", back_populates="user")
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
total = Column(Numeric(10, 2))
user = relationship("User", back_populates="orders")
# Usage
engine = create_engine('postgresql://localhost/mydb')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# ACID Transaction
try:
user = User(email='john@example.com', name='John')
session.add(user)
session.flush() # Get user.id
order = Order(user_id=user.id, total=99.99)
session.add(order)
session.commit() # Atomic - all or nothing
except Exception as e:
session.rollback() # Rollback on error
# Complex join query
from sqlalchemy import func
result = session.query(
User.name,
func.count(Order.id).label('order_count'),
func.sum(Order.total).label('total_spent')
).join(Order).group_by(User.name).all()
Async replication to read-only replicas. Handles read-heavy loads.
Trade-off: Eventual consistency for reads
Partition data across multiple databases by key (e.g., user_id).
Trade-off: No cross-shard joins, complex management
Reuse database connections to reduce overhead.
Trade-off: Limited connections, queuing
Store data as JSON-like documents. Flexible schema, horizontal scaling, eventual consistency.
MongoDB (CP), CouchDB (AP)
from pymongo import MongoClient
from datetime import datetime
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']
# Document structure (flexible schema)
user = {
'_id': 'user-123',
'email': 'john@example.com',
'name': 'John Doe',
'address': { # Embedded document
'street': '123 Main St',
'city': 'NYC',
'zip': '10001'
},
'orders': [ # Embedded array
{
'order_id': 'order-456',
'total': 99.99,
'date': datetime.now()
}
],
'tags': ['premium', 'vip'],
'created_at': datetime.now()
}
# Insert
db.users.insert_one(user)
# Query with rich expressions
users = db.users.find({
'tags': 'premium',
'orders.total': {'$gt': 50}
}).sort('created_at', -1).limit(10)
# Update (atomic operations)
db.users.update_one(
{'_id': 'user-123'},
{
'$push': {'orders': {'order_id': 'order-789', 'total': 49.99}},
'$set': {'last_updated': datetime.now()}
}
)
# Aggregation pipeline (powerful queries)
pipeline = [
{'$unwind': '$orders'},
{'$group': {
'_id': '$_id',
'total_spent': {'$sum': '$orders.total'},
'order_count': {'$sum': 1}
}},
{'$match': {'total_spent': {'$gt': 100}}}
]
big_spenders = db.users.aggregate(pipeline)
Simplest NoSQL model: unique key → value. Extreme performance, horizontal scaling.
import redis
import json
from datetime import timedelta
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Simple key-value
r.set('user:123:name', 'John Doe')
r.expire('user:123:name', timedelta(hours=1)) # TTL
name = r.get('user:123:name')
# Store objects as JSON
user = {'id': 123, 'email': 'john@example.com', 'premium': True}
r.set('user:123', json.dumps(user))
user_data = json.loads(r.get('user:123'))
# Hash (field-value pairs)
r.hset('user:123:profile', mapping={
'name': 'John',
'email': 'john@example.com',
'age': '30'
})
email = r.hget('user:123:profile', 'email')
profile = r.hgetall('user:123:profile')
# Lists (queues)
r.lpush('jobs', 'job1', 'job2', 'job3') # Producer
job = r.rpop('jobs') # Consumer
# Sets (unique values)
r.sadd('tags:user:123', 'premium', 'vip', 'early-adopter')
tags = r.smembers('tags:user:123')
# Sorted sets (leaderboard)
r.zadd('leaderboard', {'player1': 1000, 'player2': 1500, 'player3': 800})
top_players = r.zrevrange('leaderboard', 0, 9, withscores=True) # Top 10
# Atomic operations
r.incr('page:views:123') # Atomic increment
views = r.get('page:views:123')
# Pub/Sub
pubsub = r.pubsub()
pubsub.subscribe('notifications')
# Publisher (different process)
r.publish('notifications', json.dumps({'type': 'new_message', 'user_id': 123}))
# Cache pattern
def get_user_cached(user_id):
cache_key = f'cache:user:{user_id}'
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - fetch from database
user = fetch_from_database(user_id)
r.setex(cache_key, timedelta(minutes=5), json.dumps(user))
return user
Wide-column stores optimized for write-heavy workloads. Data organized by rows with dynamic columns.
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from datetime import datetime
import uuid
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
# Create keyspace (database)
session.execute("""
CREATE KEYSPACE IF NOT EXISTS metrics
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}
""")
session.set_keyspace('metrics')
# Create table (wide column family)
session.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
sensor_id UUID,
timestamp TIMESTAMP,
temperature DOUBLE,
humidity DOUBLE,
location TEXT,
PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
""")
# Insert (optimized for writes)
sensor_id = uuid.uuid4()
session.execute("""
INSERT INTO sensor_data (sensor_id, timestamp, temperature, humidity, location)
VALUES (%s, %s, %s, %s, %s)
""", (sensor_id, datetime.now(), 22.5, 65.0, 'warehouse-1'))
# Query (partition key required for performance)
rows = session.execute("""
SELECT * FROM sensor_data
WHERE sensor_id = %s
AND timestamp >= %s
""", (sensor_id, datetime(2024, 1, 1)))
# Time-series query
latest = session.execute("""
SELECT * FROM sensor_data
WHERE sensor_id = %s
LIMIT 100
""", (sensor_id,))
# Tunable consistency
from cassandra import ConsistencyLevel
statement = SimpleStatement(
"SELECT * FROM sensor_data WHERE sensor_id = %s",
consistency_level=ConsistencyLevel.QUORUM # or ONE, ALL
)
rows = session.execute(statement, (sensor_id,))
Optimized for relationship-heavy data. Nodes, edges, and properties. Fast graph traversal.
from neo4j import GraphDatabase
class SocialGraph:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def create_user(self, name, email):
with self.driver.session() as session:
session.run(
"CREATE (u:User {name: $name, email: $email})",
name=name, email=email
)
def create_friendship(self, user1, user2):
with self.driver.session() as session:
session.run("""
MATCH (u1:User {name: $user1})
MATCH (u2:User {name: $user2})
CREATE (u1)-[:FRIENDS_WITH]->(u2)
""", user1=user1, user2=user2)
def find_friends(self, name):
"""Direct friends"""
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {name: $name})-[:FRIENDS_WITH]->(friend)
RETURN friend.name AS name
""", name=name)
return [record["name"] for record in result]
def find_friends_of_friends(self, name):
"""2nd degree connections"""
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {name: $name})-[:FRIENDS_WITH*2]->(fof)
WHERE fof <> u
RETURN DISTINCT fof.name AS name
""", name=name)
return [record["name"] for record in result]
def shortest_path(self, user1, user2):
"""Find shortest connection path"""
with self.driver.session() as session:
result = session.run("""
MATCH path = shortestPath(
(u1:User {name: $user1})-[:FRIENDS_WITH*]-(u2:User {name: $user2})
)
RETURN [node in nodes(path) | node.name] AS path
""", user1=user1, user2=user2)
record = result.single()
return record["path"] if record else None
# Usage
graph = SocialGraph("bolt://localhost:7687", "neo4j", "password")
graph.create_user("Alice", "alice@example.com")
graph.create_user("Bob", "bob@example.com")
graph.create_friendship("Alice", "Bob")
friends = graph.find_friends("Alice")
path = graph.shortest_path("Alice", "Charlie")
Asynchronous communication between services. Decouple producers from consumers.
| Aspect | RabbitMQ (Queue) | Kafka (Stream) |
|---|---|---|
| Model | Message queue (delete after consume) | Distributed log (persistent, replayable) |
| Delivery | One consumer per message | Multiple consumers (consumer groups) |
| Ordering | Per queue | Per partition |
| Retention | Until consumed | Configurable (days/weeks) |
| Replay | No | Yes (rewind offset) |
| Throughput | Good (10K+ msg/sec) | Excellent (millions/sec) |
| Use Case | Task queues, RPC, work distribution | Event sourcing, stream processing, logs |
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Publish event
event = {
'event_type': 'user_registered',
'user_id': 'user-123',
'timestamp': '2024-01-01T12:00:00Z'
}
producer.send('user-events', event)
producer.flush()
# Consumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers='localhost:9092',
group_id='email-service', # Consumer group
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest' # Start from beginning
)
for message in consumer:
event = message.value
print(f"Processing: {event['event_type']}")
# Send welcome email
# Update analytics
# Commit offset automatically (or manually for at-least-once)
import pika
import json
# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
task = {'task_id': 'task-123', 'type': 'send_email', 'to': 'user@example.com'}
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(task),
properties=pika.BasicProperties(delivery_mode=2) # Persistent
)
connection.close()
# Consumer (Worker)
def callback(ch, method, properties, body):
task = json.loads(body)
print(f"Processing task: {task['task_id']}")
# Do work...
ch.basic_ack(delivery_tag=method.delivery_tag) # Acknowledge
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
channel.basic_qos(prefetch_count=1) # Fair dispatch
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
| System | Data Model | CAP | Consistency | Scaling | Query Capability |
|---|---|---|---|---|---|
| PostgreSQL | Relational (tables) | CA/CP | Strong (ACID) | Vertical, Read replicas | SQL (powerful joins) |
| MongoDB | Document (JSON) | CP | Eventual → Strong | Horizontal (sharding) | Rich queries, aggregation |
| Redis | Key-Value | CP (cluster) | Eventually consistent | Horizontal (cluster) | Simple (key lookup) |
| DynamoDB | Key-Value / Document | AP | Eventual | Auto (managed) | Limited (key + index) |
| Cassandra | Wide Column | AP | Tunable | Horizontal (excellent) | CQL (limited joins) |
| Neo4j | Graph | CA | ACID | Vertical primarily | Cypher (graph queries) |
| Elasticsearch | Document (inverted index) | CP | Near real-time | Horizontal (sharding) | Full-text, aggregations |
| Kafka | Log / Stream | AP | Eventual | Horizontal (partitions) | N/A (streaming) |
Modern architectures combine different databases for different needs: