Architectural patterns are high-level strategies for organizing code and systems. While design patterns (GoF) focus on object-level solutions, architectural patterns address system-level concerns like scalability, maintainability, and distributed computing.
For Senior/Distinguished Engineers: You should be able to explain when to use each pattern, their trade-offs, and how they combine in real systems.
Break down application into small, independently deployable services that communicate over network.
| Pros: | Cons: |
|
+ Independent deployment + Technology flexibility + Team autonomy + Fault isolation + Scalability per service |
- Distributed system complexity - Network latency - Data consistency challenges - Testing complexity - Operational overhead |
# User Service (Flask microservice)
from flask import Flask, jsonify, request
from sqlalchemy import create_engine
import requests
app = Flask(__name__)
db = create_engine('postgresql://localhost/users')
@app.route('/users/', methods=['GET'])
def get_user(user_id):
"""Get user details"""
# Query local database
user = db.execute(f"SELECT * FROM users WHERE id = {user_id}").fetchone()
# Call other service if needed (sync)
# orders = requests.get(f'http://order-service/orders?user_id={user_id}')
return jsonify({
'id': user.id,
'name': user.name,
'email': user.email
})
@app.route('/users', methods=['POST'])
def create_user():
"""Create new user"""
data = request.json
db.execute(
"INSERT INTO users (name, email) VALUES (:name, :email)",
name=data['name'],
email=data['email']
)
# Publish event for other services (async)
publish_event('user.created', data)
return jsonify({'status': 'created'}), 201
def publish_event(event_type, data):
"""Publish to message queue"""
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('events', {
'type': event_type,
'data': data
})
if __name__ == '__main__':
app.run(port=5001)
# Order Service (separate process)
@app.route('/orders', methods=['GET'])
def get_orders():
user_id = request.args.get('user_id')
orders = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}").fetchall()
return jsonify([dict(o) for o in orders])
Services communicate through events rather than direct calls. Producers emit events, consumers react to them.
+──────────────+ +───────────────────+
│ Producer │ ──────> │ Event Bus │
│ (Service A) │ emit │ (Kafka/RabbitMQ) │
+──────────────+ +────────┬──────────+
│
+─────────────┼─────────────+
│ │ │
+─────▼──+ +─────▼──+ +─────▼──+
│Consumer│ │Consumer│ │Consumer│
│ B │ │ C │ │ D │
+────────+ +────────+ +────────+
(subscribe) (subscribe) (subscribe)
# Event Publisher
from dataclasses import dataclass
from datetime import datetime
from kafka import KafkaProducer
import json
@dataclass
class Event:
event_type: str
aggregate_id: str
data: dict
timestamp: datetime = datetime.utcnow()
class EventPublisher:
def __init__(self, broker='localhost:9092'):
self.producer = KafkaProducer(
bootstrap_servers=broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def publish(self, event: Event):
self.producer.send('events', {
'type': event.event_type,
'aggregate_id': event.aggregate_id,
'data': event.data,
'timestamp': event.timestamp.isoformat()
})
print(f"Published: {event.event_type}")
# Event Consumer
from kafka import KafkaConsumer
from abc import ABC, abstractmethod
class EventHandler(ABC):
@abstractmethod
def can_handle(self, event_type: str) -> bool:
pass
@abstractmethod
def handle(self, event: dict):
pass
class OrderCreatedHandler(EventHandler):
def can_handle(self, event_type: str) -> bool:
return event_type == 'order.created'
def handle(self, event: dict):
print(f"Processing order: {event['aggregate_id']}")
# Send email
# Update inventory
# Create shipping label
return "Order processed"
class PaymentReceivedHandler(EventHandler):
def can_handle(self, event_type: str) -> bool:
return event_type == 'payment.received'
def handle(self, event: dict):
print(f"Payment received: {event['data']['amount']}")
# Update order status
# Send receipt
return "Payment processed"
class EventConsumer:
def __init__(self, handlers: list[EventHandler], broker='localhost:9092'):
self.consumer = KafkaConsumer(
'events',
bootstrap_servers=broker,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.handlers = handlers
def start(self):
print("Starting event consumer...")
for message in self.consumer:
event = message.value
event_type = event['type']
for handler in self.handlers:
if handler.can_handle(event_type):
handler.handle(event)
# Usage
if __name__ == '__main__':
# Producer side
publisher = EventPublisher()
publisher.publish(Event(
event_type='order.created',
aggregate_id='order-123',
data={'user_id': 'user-456', 'total': 99.99}
))
# Consumer side (separate process)
consumer = EventConsumer([
OrderCreatedHandler(),
PaymentReceivedHandler()
])
# consumer.start() # Blocks and listens
Separate read and write operations into different models. Commands change state, queries return data.
+──────────+ +──────────+
│ Client │ │ Client │
+────┬─────+ +────┬─────+
│ Command (Write) │ Query (Read)
│ │
+────▼─────────+ +────▼──────────+
│Command Model │ │ Query Model │
│ (Write DB) │ │ (Read DB) │
│ PostgreSQL │ ─────sync/async────> │ Elasticsearch│
│ (normalized) │ replication │ (denormalized)│
+──────────────+ +───────────────+
(optimized for reads)
# Command Side (Write)
from dataclasses import dataclass
from abc import ABC, abstractmethod
@dataclass
class Command(ABC):
"""Base command"""
pass
@dataclass
class CreateOrderCommand(Command):
user_id: str
items: list
total: float
@dataclass
class CancelOrderCommand(Command):
order_id: str
reason: str
class CommandHandler(ABC):
@abstractmethod
def handle(self, command: Command):
pass
class CreateOrderHandler(CommandHandler):
def __init__(self, write_db, event_bus):
self.write_db = write_db
self.event_bus = event_bus
def handle(self, command: CreateOrderCommand):
# Validate
if command.total <= 0:
raise ValueError("Invalid total")
# Write to database (source of truth)
order_id = self.write_db.execute("""
INSERT INTO orders (user_id, items, total, status)
VALUES (:user_id, :items, :total, 'pending')
RETURNING id
""", **command.__dict__).fetchone()[0]
# Publish event for read model sync
self.event_bus.publish({
'type': 'OrderCreated',
'order_id': order_id,
'user_id': command.user_id,
'total': command.total
})
return order_id
# Query Side (Read)
@dataclass
class Query(ABC):
"""Base query"""
pass
@dataclass
class GetOrdersByUserQuery(Query):
user_id: str
@dataclass
class GetOrderDetailsQuery(Query):
order_id: str
class QueryHandler(ABC):
@abstractmethod
def handle(self, query: Query):
pass
class GetOrdersByUserHandler(QueryHandler):
def __init__(self, read_db):
self.read_db = read_db # Optimized for reads (e.g., Elasticsearch)
def handle(self, query: GetOrdersByUserQuery):
# Query denormalized, optimized read model
return self.read_db.search({
'query': {
'match': {'user_id': query.user_id}
},
'sort': [{'created_at': 'desc'}]
})
# Read Model Projector (syncs write → read)
class OrderProjector:
"""Updates read model when events occur"""
def __init__(self, read_db):
self.read_db = read_db
def project_order_created(self, event):
"""Denormalize and optimize for reads"""
# Combine data from multiple sources
user = self.fetch_user_info(event['user_id'])
self.read_db.index('orders', {
'order_id': event['order_id'],
'user_name': user['name'],
'user_email': user['email'],
'total': event['total'],
'created_at': event['timestamp'],
# ... optimized structure for queries
})
# Application Service (coordinates)
class OrderService:
def __init__(self, command_handlers, query_handlers):
self.command_handlers = command_handlers
self.query_handlers = query_handlers
def create_order(self, user_id, items, total):
"""Command"""
command = CreateOrderCommand(user_id, items, total)
handler = self.command_handlers[CreateOrderCommand]
return handler.handle(command)
def get_user_orders(self, user_id):
"""Query"""
query = GetOrdersByUserQuery(user_id)
handler = self.query_handlers[GetOrdersByUserQuery]
return handler.handle(query)
| Pros: | Cons: |
|
+ Independent scaling (read/write) + Optimized data models + Complex queries don't impact writes + Clear separation of concerns |
- Eventual consistency - Code complexity - Duplication of data - Synchronization overhead |
Manage data consistency across microservices using sequence of local transactions with compensating actions.
Choreography Saga (Event-driven):
Order Service ──[OrderCreated]──> Payment Service ──[PaymentSuccess]──> Inventory Service
│ │ │
│ [PaymentFailed] [InventoryReserved]
│ │ │
│ ▼ │
+────────────────────── [CancelOrder] <─────────────────────────────────-+
(Compensating action)
Orchestration Saga (Coordinator-based):
+─────────────────+
│ Saga Coordinator│
+────────┬────────+
│
+────────────────┼────────────────+
│ │ │
+─────▼──────+ +─────▼──────+ +─────▼──────+
│ Order │ │ Payment │ │ Inventory │
│ Service │ │ Service │ │ Service │
+────────────+ +────────────+ +────────────+
# Saga Coordinator Pattern
from enum import Enum
from dataclasses import dataclass
from typing import Callable, List
class SagaStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
@dataclass
class SagaStep:
"""Each step in the saga"""
name: str
action: Callable # Forward action
compensation: Callable # Rollback action
completed: bool = False
class SagaOrchestrator:
"""Coordinates distributed transaction"""
def __init__(self):
self.steps: List[SagaStep] = []
self.status = SagaStatus.PENDING
self.completed_steps: List[SagaStep] = []
def add_step(self, name: str, action: Callable, compensation: Callable):
self.steps.append(SagaStep(name, action, compensation))
return self
def execute(self, context: dict):
"""Execute saga - all or nothing"""
print(f"Starting saga with {len(self.steps)} steps")
try:
# Execute each step
for step in self.steps:
print(f" Executing: {step.name}")
step.action(context)
step.completed = True
self.completed_steps.append(step)
self.status = SagaStatus.COMPLETED
print("Saga completed successfully")
return True
except Exception as e:
print(f"Saga failed at {step.name}: {e}")
self.status = SagaStatus.FAILED
self._compensate(context)
return False
def _compensate(self, context: dict):
"""Rollback completed steps in reverse order"""
self.status = SagaStatus.COMPENSATING
print("Starting compensation (rollback)...")
for step in reversed(self.completed_steps):
print(f" Compensating: {step.name}")
try:
step.compensation(context)
except Exception as e:
print(f" Warning: Compensation failed for {step.name}: {e}")
self.status = SagaStatus.COMPENSATED
print("Compensation completed")
# Example: Order Processing Saga
class OrderService:
@staticmethod
def create_order(context):
order_id = f"order-{context['user_id']}"
context['order_id'] = order_id
print(f" Created order: {order_id}")
@staticmethod
def cancel_order(context):
print(f" Cancelled order: {context.get('order_id')}")
class PaymentService:
@staticmethod
def charge_payment(context):
if context.get('simulate_payment_failure'):
raise Exception("Payment declined")
context['payment_id'] = "payment-123"
print(f" Charged ${context['amount']}")
@staticmethod
def refund_payment(context):
if 'payment_id' in context:
print(f" Refunded payment: {context['payment_id']}")
class InventoryService:
@staticmethod
def reserve_inventory(context):
print(f" Reserved {context['quantity']} items")
context['reservation_id'] = "res-456"
@staticmethod
def release_inventory(context):
if 'reservation_id' in context:
print(f" Released reservation: {context['reservation_id']}")
class ShippingService:
@staticmethod
def create_shipment(context):
print(f" Created shipment for order: {context['order_id']}")
context['shipment_id'] = "ship-789"
@staticmethod
def cancel_shipment(context):
if 'shipment_id' in context:
print(f" Cancelled shipment: {context['shipment_id']}")
# Usage
if __name__ == '__main__':
# Successful saga
print("\n" + "="*60)
print("SAGA 1: Successful Order")
print("="*60)
saga = SagaOrchestrator()
saga.add_step("Create Order",
OrderService.create_order,
OrderService.cancel_order)
saga.add_step("Process Payment",
PaymentService.charge_payment,
PaymentService.refund_payment)
saga.add_step("Reserve Inventory",
InventoryService.reserve_inventory,
InventoryService.release_inventory)
saga.add_step("Create Shipment",
ShippingService.create_shipment,
ShippingService.cancel_shipment)
context = {'user_id': 'user-123', 'amount': 99.99, 'quantity': 2}
success = saga.execute(context)
print(f"Final context: {context}")
# Failed saga (triggers compensation)
print("\n" + "="*60)
print("SAGA 2: Failed Payment (will compensate)")
print("="*60)
saga2 = SagaOrchestrator()
saga2.add_step("Create Order",
OrderService.create_order,
OrderService.cancel_order)
saga2.add_step("Process Payment",
PaymentService.charge_payment,
PaymentService.refund_payment)
saga2.add_step("Reserve Inventory",
InventoryService.reserve_inventory,
InventoryService.release_inventory)
context2 = {
'user_id': 'user-456',
'amount': 199.99,
'quantity': 5,
'simulate_payment_failure': True # Trigger failure
}
success = saga2.execute(context2)
print(f"Saga status: {saga2.status}")
Isolate core business logic from external concerns (UI, database, APIs) using ports and adapters.
Adapters (Infrastructure) Core (Business Logic)
+─────────────+ +────────────────────+
│ REST API │ ────────────────> │ │
│ (FastAPI) │ Port (IOrderRepo) Domain Model │
+─────────────+ │ (Business Rules) │
│ │
+─────────────+ │ │
│ GraphQL │ ────────────────> │ │
│ API │ Port +────────────────────+
+─────────────+ │
│ Port (IOrderRepo)
+─────────────+ ▼
│ PostgreSQL │ <──────────── Adapter ────────────────
│ Database │ (implements port)
+─────────────+
+─────────────+
│ Kafka │ <──────────── Event Adapter
│ Message │
+─────────────+
# Core Domain (Business Logic) - No dependencies
from dataclasses import dataclass
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List
@dataclass
class Order:
"""Domain Entity"""
id: str
user_id: str
items: List[dict]
total: float
status: str
created_at: datetime
def can_cancel(self) -> bool:
"""Business rule"""
return self.status in ['pending', 'confirmed']
def cancel(self):
"""Business logic"""
if not self.can_cancel():
raise ValueError(f"Cannot cancel order in {self.status} status")
self.status = 'cancelled'
# Ports (Interfaces) - Define what core needs
class IOrderRepository(ABC):
"""Output port - persistence"""
@abstractmethod
def save(self, order: Order):
pass
@abstractmethod
def find_by_id(self, order_id: str) -> Order:
pass
class INotificationService(ABC):
"""Output port - external service"""
@abstractmethod
def send_order_confirmation(self, order: Order):
pass
# Core Application Service (Use Cases)
class OrderService:
"""Core business logic - depends only on ports"""
def __init__(self,
repository: IOrderRepository,
notification: INotificationService):
self.repository = repository
self.notification = notification
def create_order(self, user_id: str, items: List[dict], total: float) -> Order:
"""Use case: Create Order"""
# Business logic
order = Order(
id=self._generate_id(),
user_id=user_id,
items=items,
total=total,
status='pending',
created_at=datetime.now()
)
# Validate business rules
if total <= 0:
raise ValueError("Total must be positive")
# Use port to persist
self.repository.save(order)
# Use port to notify
self.notification.send_order_confirmation(order)
return order
def cancel_order(self, order_id: str):
"""Use case: Cancel Order"""
order = self.repository.find_by_id(order_id)
order.cancel() # Domain logic
self.repository.save(order)
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())
# Adapters (Infrastructure) - Implement ports
class PostgreSQLOrderRepository(IOrderRepository):
"""Adapter for PostgreSQL"""
def __init__(self, connection_string):
from sqlalchemy import create_engine
self.engine = create_engine(connection_string)
def save(self, order: Order):
self.engine.execute("""
INSERT INTO orders (id, user_id, items, total, status, created_at)
VALUES (:id, :user_id, :items, :total, :status, :created_at)
ON CONFLICT (id) DO UPDATE
SET status = :status
""", **order.__dict__)
def find_by_id(self, order_id: str) -> Order:
row = self.engine.execute(
"SELECT * FROM orders WHERE id = :id",
id=order_id
).fetchone()
return Order(**dict(row))
class InMemoryOrderRepository(IOrderRepository):
"""Adapter for in-memory (testing)"""
def __init__(self):
self.orders = {}
def save(self, order: Order):
self.orders[order.id] = order
def find_by_id(self, order_id: str) -> Order:
return self.orders.get(order_id)
class EmailNotificationService(INotificationService):
"""Adapter for email notifications"""
def send_order_confirmation(self, order: Order):
# Send email via SMTP
print(f"Sending email for order {order.id} to user {order.user_id}")
class KafkaNotificationService(INotificationService):
"""Adapter for Kafka events"""
def __init__(self, broker):
from kafka import KafkaProducer
self.producer = KafkaProducer(bootstrap_servers=broker)
def send_order_confirmation(self, order: Order):
self.producer.send('order-events', {
'type': 'OrderCreated',
'order_id': order.id
})
# Input Adapters (Controllers)
from fastapi import FastAPI
class OrderController:
"""REST API adapter (input port)"""
def __init__(self, order_service: OrderService):
self.order_service = order_service
self.app = FastAPI()
self._setup_routes()
def _setup_routes(self):
@self.app.post("/orders")
def create_order(user_id: str, items: list, total: float):
order = self.order_service.create_order(user_id, items, total)
return {"order_id": order.id, "status": order.status}
@self.app.delete("/orders/{order_id}")
def cancel_order(order_id: str):
self.order_service.cancel_order(order_id)
return {"status": "cancelled"}
# Dependency Injection (Wire it up)
if __name__ == '__main__':
# Choose adapters (easily swappable!)
repo = InMemoryOrderRepository() # or PostgreSQLOrderRepository(...)
notifier = EmailNotificationService() # or KafkaNotificationService(...)
# Inject dependencies
order_service = OrderService(repo, notifier)
# Create input adapter
controller = OrderController(order_service)
# Run
# uvicorn.run(controller.app)
| Pattern | Best For | Complexity | When NOT to Use |
|---|---|---|---|
| Microservices | Large teams, independent deployment | Very High | Small apps, tight coupling needed |
| Event-Driven | Async workflows, decoupling | High | Immediate consistency required |
| CQRS | Different read/write loads | High | Simple CRUD, strong consistency needed |
| Saga | Distributed transactions | High | Can use 2PC, single database |
| Hexagonal | Clean architecture, testability | Medium | Very simple apps, rapid prototyping |