Streaming Systems Deep Dive
Kafka, Event Streaming, and Real-Time Data Processing
Why Streaming Systems?
The Problem with Request-Response
Traditional: Request-Response
graph TB
Service1[Order Service] -->|"HTTP POST"| Service2[Inventory Service]
Service2 -->|"HTTP POST"| Service3[Shipping Service]
Service3 -->|"HTTP POST"| Service4[Notification Service]
style Service1 fill:#bf616a,color:#2e3440
style Service2 fill:#bf616a,color:#2e3440
style Service3 fill:#bf616a,color:#2e3440
style Service4 fill:#bf616a,color:#2e3440
Problems with synchronous communication:
- Tight coupling: Order Service must know about Inventory, Shipping, Notification
- Cascading failures: If Shipping is down, Order fails
- Scaling bottlenecks: Each service must handle peak load synchronously
- No replay: If Notification was down, those events are lost
The Streaming Solution
graph LR
subgraph "Event-Driven: Streaming"
Order[Order Service] -->|"Publish"| Kafka[Kafka]
Kafka -->|"Subscribe"| Inventory[Inventory Service]
Kafka -->|"Subscribe"| Shipping[Shipping Service]
Kafka -->|"Subscribe"| Notification[Notification Service]
Kafka -->|"Subscribe"| Analytics[Analytics Service]
end
style Order fill:#a3be8c,color:#2e3440
style Kafka fill:#d08770,color:#2e3440
style Inventory fill:#88c0d0,color:#2e3440
style Shipping fill:#88c0d0,color:#2e3440
style Notification fill:#88c0d0,color:#2e3440
style Analytics fill:#88c0d0,color:#2e3440
WHY event streaming is better:
- Loose coupling: Order Service just publishes events, doesn't care who consumes
- Fault tolerance: Consumers can fail independently, catch up later
- Scalability: Add more consumers without changing producer
- Replay: Events are stored, can be reprocessed
- Real-time + Batch: Same data feeds both use cases
Kafka Internals
Core Concepts
graph LR
subgraph Producers["Producers"]
Producer1([Order Service])
Producer2([Checkout Service])
end
subgraph Cluster["Kafka Cluster - Topic: orders"]
P0["Partition 0
Leader: Broker 1"]
P1["Partition 1
Leader: Broker 2"]
P2["Partition 2
Leader: Broker 3"]
end
subgraph Consumers["Consumer Group: order-processors"]
C1[Consumer 1
Reads P0]
C2[Consumer 2
Reads P1]
C3[Consumer 3
Reads P2]
end
Producer1 -->|"order_id=123"| P0
Producer1 -->|"order_id=456"| P1
Producer2 -->|"order_id=789"| P2
P0 --> C1
P1 --> C2
P2 --> C3
style P0 fill:#88c0d0,color:#2e3440
style P1 fill:#88c0d0,color:#2e3440
style P2 fill:#88c0d0,color:#2e3440
style Producer1 fill:#a3be8c,color:#2e3440
style Producer2 fill:#a3be8c,color:#2e3440
style C1 fill:#ebcb8b,color:#2e3440
style C2 fill:#ebcb8b,color:#2e3440
style C3 fill:#ebcb8b,color:#2e3440
| Concept |
What It Is |
Analogy |
| Topic |
Category of messages (e.g., "orders", "user-events") |
Database table |
| Partition |
Ordered, immutable sequence of records within a topic |
Shard of a table |
| Offset |
Unique ID for each record within a partition |
Row number |
| Consumer Group |
Set of consumers that share work (each partition → one consumer) |
Worker pool |
| Broker |
Kafka server that stores data and serves clients |
Database node |
Partition Deep Dive
How Partitions Work
graph LR
subgraph "Partition 0 (Append-Only Log)"
O0["Offset 0
order_id: 100"] --> O1["Offset 1
order_id: 103"]
O1 --> O2["Offset 2
order_id: 106"]
O2 --> O3["Offset 3
order_id: 109"]
O3 --> O4["Offset 4
order_id: 112"]
O4 --> New["New messages
appended here →"]
end
Producer([Producer]) -->|"Write"| New
Consumer([Consumer]) -->|"Read from offset 2"| O2
style O0 fill:#434c5e,color:#e0e0e0
style O1 fill:#434c5e,color:#e0e0e0
style O2 fill:#88c0d0,color:#2e3440
style O3 fill:#88c0d0,color:#2e3440
style O4 fill:#88c0d0,color:#2e3440
style New fill:#a3be8c,color:#2e3440
style Producer fill:#a3be8c,color:#2e3440
style Consumer fill:#ebcb8b,color:#2e3440
WHY partitions matter:
- Parallelism: More partitions = more consumers can read in parallel
- Ordering: Messages within a partition are strictly ordered
- Scalability: Partitions can be spread across brokers
Partition Key Selection
# Messages with same key go to same partition
producer.send("orders", key="user_123", value={"order_id": 456, ...})
producer.send("orders", key="user_123", value={"order_id": 789, ...})
# Both go to same partition → ordering preserved for user_123
# How Kafka chooses partition:
partition = hash(key) % num_partitions
HOW to choose partition key:
- User ID: All events for a user go to same partition → ordering preserved
- Order ID: All updates to an order stay ordered
- No key: Round-robin distribution → maximum parallelism, no ordering
Rule of thumb: Use the entity ID that needs ordering guarantees.
Replication
graph LR
Producer([Producer]) -->|"Write"| Leader
subgraph B1["Broker 1"]
Leader["Partition 0
(LEADER)
Offsets: 0-100"]
end
subgraph B2["Broker 2"]
Follower1["Partition 0
(FOLLOWER)
Offsets: 0-100"]
end
subgraph B3["Broker 3"]
Follower2["Partition 0
(FOLLOWER)
Offsets: 0-98"]
end
Leader -->|"Replicate"| Follower1
Leader -->|"Replicate"| Follower2
Consumer([Consumer]) -->|"Read"| Leader
style Leader fill:#a3be8c,color:#2e3440
style Follower1 fill:#88c0d0,color:#2e3440
style Follower2 fill:#ebcb8b,color:#2e3440
style Producer fill:#a3be8c,color:#2e3440
style Consumer fill:#ebcb8b,color:#2e3440
ISR (In-Sync Replicas)
Replicas that are caught up with the leader. A message is "committed" when all ISRs have it.
| Setting |
Value |
Meaning |
replication.factor |
3 |
Each partition has 3 copies |
min.insync.replicas |
2 |
At least 2 replicas must acknowledge before commit |
acks (producer) |
all |
Wait for all ISRs to acknowledge |
WHY these settings?
replication.factor=3: Survive 2 broker failures
min.insync.replicas=2: Data safe even if 1 replica lags
acks=all: No data loss on leader failure
Consumer Groups
graph LR
subgraph Topic["Topic: orders (6 partitions)"]
P0[P0]
P1[P1]
P2[P2]
P3[P3]
P4[P4]
P5[P5]
end
subgraph GroupA["Consumer Group A (3 consumers)"]
CA1[Consumer A1
Reads P0, P1]
CA2[Consumer A2
Reads P2, P3]
CA3[Consumer A3
Reads P4, P5]
end
subgraph GroupB["Consumer Group B (2 consumers)"]
CB1[Consumer B1
Reads P0, P1, P2]
CB2[Consumer B2
Reads P3, P4, P5]
end
P0 --> CA1
P1 --> CA1
P2 --> CA2
P3 --> CA2
P4 --> CA3
P5 --> CA3
P0 --> CB1
P1 --> CB1
P2 --> CB1
P3 --> CB2
P4 --> CB2
P5 --> CB2
style P0 fill:#88c0d0,color:#2e3440
style P1 fill:#88c0d0,color:#2e3440
style P2 fill:#88c0d0,color:#2e3440
style P3 fill:#88c0d0,color:#2e3440
style P4 fill:#88c0d0,color:#2e3440
style P5 fill:#88c0d0,color:#2e3440
style CA1 fill:#a3be8c,color:#2e3440
style CA2 fill:#a3be8c,color:#2e3440
style CA3 fill:#a3be8c,color:#2e3440
style CB1 fill:#ebcb8b,color:#2e3440
style CB2 fill:#ebcb8b,color:#2e3440
HOW consumer groups work:
- Each partition assigned to exactly ONE consumer in a group
- If consumers > partitions, some consumers sit idle
- If consumers < partitions, some consumers handle multiple partitions
- Different groups read independently (each group gets all messages)
Rule: num_partitions >= num_consumers for parallelism
Consumer Offset Management
# Consumer tracks its position per partition
# Stored in internal topic: __consumer_offsets
# Auto-commit (default, can lose messages)
enable.auto.commit=true
auto.commit.interval.ms=5000
# Manual commit (safer)
enable.auto.commit=false
while True:
records = consumer.poll(timeout=1000)
for record in records:
process(record) # Do work first
consumer.commit() # Then commit offset
At-Least-Once Pitfall:
If you commit BEFORE processing and crash during processing → message lost!
If you commit AFTER processing and crash after processing but before commit → message reprocessed!
Make your processing idempotent to handle reprocessing safely.
Exactly-Once Semantics
The Three Guarantees
| Guarantee |
Meaning |
When Messages Lost? |
When Duplicates? |
| At-Most-Once |
Fire and forget |
Producer crash, network failure |
Never |
| At-Least-Once |
Retry until acknowledged |
Never |
Retry after timeout, consumer crash |
| Exactly-Once |
Each message processed exactly once |
Never |
Never |
How Kafka Achieves Exactly-Once
1. Idempotent Producer
Kafka assigns a unique ID to each producer. Retried messages with same sequence number are deduplicated.
# Enable idempotent producer
enable.idempotence=true
# Kafka tracks: (producer_id, sequence_number) per partition
# Duplicate detected if same sequence_number arrives twice
sequenceDiagram
participant Producer
participant Broker
Producer->>Broker: Message (seq=1)
Broker-->>Producer: ACK
Producer->>Broker: Message (seq=2)
Note over Broker: Network timeout, no ACK received
Producer->>Broker: Retry Message (seq=2)
Note over Broker: Duplicate! Same seq=2, ignore
Broker-->>Producer: ACK (deduped)
2. Transactional Producer
Atomic writes across multiple partitions/topics. All or nothing.
producer = KafkaProducer(
transactional_id="order-processor-1", # Unique ID
enable_idempotence=True
)
producer.init_transactions()
try:
producer.begin_transaction()
# These writes are atomic
producer.send("orders", value=order)
producer.send("inventory", value=inventory_update)
producer.send("notifications", value=notification)
producer.commit_transaction() # All visible at once
except Exception:
producer.abort_transaction() # All rolled back
WHY transactional producer?
- Consume from topic A, produce to topic B atomically
- Update multiple topics consistently
- Stream processing frameworks use this internally
3. Consume-Transform-Produce Pattern
graph LR
Input[Input Topic] --> Consumer[Consumer]
Consumer --> Process[Process]
Process --> Producer[Producer]
Producer --> Output[Output Topic]
Producer -->|"Commit offset
in same transaction"| Offsets[__consumer_offsets]
style Input fill:#88c0d0,color:#2e3440
style Output fill:#a3be8c,color:#2e3440
style Offsets fill:#ebcb8b,color:#2e3440
style Consumer fill:#d08770,color:#2e3440
style Process fill:#d08770,color:#2e3440
style Producer fill:#d08770,color:#2e3440
# Exactly-once stream processing
producer.begin_transaction()
# Read from input
records = consumer.poll()
for record in records:
result = transform(record)
# Write to output
producer.send("output-topic", value=result)
# Commit offset AND output in same transaction
producer.send_offsets_to_transaction(
consumer.get_offsets(),
consumer.group_id
)
producer.commit_transaction()
HOW this achieves exactly-once:
- Consumer reads message at offset N
- Process and produce output
- Commit output + offset N atomically
- If crash before commit → both rolled back → reprocess from N
- If crash after commit → both committed → move to N+1
Stream Processing Frameworks
When to Use What
| Framework |
Best For |
Latency |
Complexity |
| Kafka Streams |
JVM apps, simple transformations, small state |
Milliseconds |
Low |
| Apache Flink |
Complex event processing, large state, exactly-once |
Milliseconds |
High |
| Apache Spark Streaming |
Batch-like processing, ML integration |
Seconds (micro-batch) |
Medium |
| ksqlDB |
SQL queries on streams, quick prototyping |
Milliseconds |
Very Low |
Kafka Streams Example
// Word count streaming
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("input-topic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
wordCounts.toStream().to("word-counts-output");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Stream Processing Concepts
Windowing
Group events by time windows for aggregation.
graph LR
subgraph "Tumbling Window (Non-overlapping)"
T1["Window 1
00:00-00:05"]
T2["Window 2
00:05-00:10"]
T3["Window 3
00:10-00:15"]
end
subgraph "Sliding Window (Overlapping)"
S1["Window
00:00-00:05"]
S2["Window
00:02-00:07"]
S3["Window
00:04-00:09"]
end
subgraph "Session Window (Gap-based)"
SS1["Session 1
(events close together)"]
Gap["Gap > threshold"]
SS2["Session 2
(new session)"]
end
// Tumbling window: Count orders per 5-minute window
orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
// Sliding window: Count orders in last 5 minutes, updated every 1 minute
orders
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
// Session window: Group user activity with 30-minute inactivity gap
userEvents
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
Event Time vs Processing Time
graph LR
subgraph "Event Time"
E1["Event created
10:00:00"]
end
subgraph "Processing Time"
E2["Event processed
10:00:05"]
end
E1 -->|"Network delay
5 seconds"| E2
Event Time vs Processing Time:
| Aspect |
Event Time |
Processing Time |
| Definition |
When event actually occurred |
When event is processed |
| Deterministic? |
Yes (replayable) |
No (depends on when processed) |
| Late events? |
Must handle |
Not an issue |
| Use case |
Accurate analytics |
Real-time monitoring |
// Handle late events with watermarks
.windowedBy(TimeWindows
.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
// Events up to 1 minute late will still be included in correct window
Stateful Processing
graph LR
Input[Input Stream] --> Processor[Processor]
Processor <--> State[(Local State
RocksDB)]
Processor --> Output[Output Stream]
State -->|"Changelog"| Changelog[Changelog Topic]
style Input fill:#88c0d0,color:#2e3440
style Output fill:#a3be8c,color:#2e3440
style State fill:#ebcb8b,color:#2e3440
style Changelog fill:#d08770,color:#2e3440
style Processor fill:#b48ead,color:#2e3440
WHY local state + changelog?
- Performance: Local state (RocksDB) is fast, no network calls
- Durability: Changelog topic backs up state to Kafka
- Recovery: On crash, rebuild state from changelog
- Scalability: State partitioned like input topic
Common Streaming Patterns
1. Event Sourcing
Store state as a sequence of events, not current state.
graph LR
subgraph "Event Log (Source of Truth)"
E1["AccountCreated
{id: 123}"]
E2["MoneyDeposited
{amount: 100}"]
E3["MoneyWithdrawn
{amount: 30}"]
E4["MoneyDeposited
{amount: 50}"]
end
subgraph "Materialized View"
View["Account 123
Balance: $120"]
end
E1 --> E2 --> E3 --> E4 --> View
# Event sourcing with Kafka
# Topic: account-events (source of truth)
{"event": "AccountCreated", "account_id": "123", "timestamp": "..."}
{"event": "MoneyDeposited", "account_id": "123", "amount": 100}
{"event": "MoneyWithdrawn", "account_id": "123", "amount": 30}
{"event": "MoneyDeposited", "account_id": "123", "amount": 50}
# Consumer rebuilds current state by replaying events
balance = 0
for event in events:
if event.type == "MoneyDeposited":
balance += event.amount
elif event.type == "MoneyWithdrawn":
balance -= event.amount
# balance = 120
WHY event sourcing?
- Audit trail: Complete history of what happened
- Debugging: Replay events to reproduce bugs
- New views: Build new projections from existing events
- Time travel: Query state at any point in time
2. CQRS (Command Query Responsibility Segregation)
graph LR
subgraph Write["Write Side"]
Command([Command]) --> WriteAPI[Write API]
WriteAPI --> EventStore[(Event Store
Kafka)]
end
subgraph Read["Read Side"]
EventStore --> Projector[Projector]
Projector --> ReadDB[(Read Database
Optimized for queries)]
Query([Query]) --> ReadAPI[Read API]
ReadAPI --> ReadDB
end
style Command fill:#bf616a,color:#2e3440
style Query fill:#a3be8c,color:#2e3440
style EventStore fill:#d08770,color:#2e3440
style ReadDB fill:#88c0d0,color:#2e3440
style WriteAPI fill:#ebcb8b,color:#2e3440
style ReadAPI fill:#ebcb8b,color:#2e3440
style Projector fill:#b48ead,color:#2e3440
HOW CQRS works:
- Commands (writes) go to event store (Kafka)
- Projectors consume events and update read databases
- Queries read from optimized read models (can be different DBs!)
- Scale reads and writes independently
3. Change Data Capture (CDC)
graph LR
subgraph "Source System"
App[Application] --> DB[(MySQL)]
end
subgraph "CDC"
DB -->|"Read binlog"| Debezium[Debezium]
Debezium --> Kafka[Kafka]
end
subgraph "Consumers"
Kafka --> Search[Elasticsearch]
Kafka --> Cache[Redis Cache]
Kafka --> Analytics[Data Warehouse]
end
style DB fill:#88c0d0,color:#2e3440
style Debezium fill:#a3be8c,color:#2e3440
style Kafka fill:#d08770,color:#2e3440
style Search fill:#ebcb8b,color:#2e3440
style Cache fill:#ebcb8b,color:#2e3440
style Analytics fill:#ebcb8b,color:#2e3440
# Debezium CDC event example
{
"before": {"id": 123, "name": "John", "email": "john@old.com"},
"after": {"id": 123, "name": "John", "email": "john@new.com"},
"source": {
"db": "users",
"table": "accounts",
"ts_ms": 1699999200000
},
"op": "u" // u=update, c=create, d=delete
}
WHY CDC?
- No code changes: Capture changes from DB logs, not application
- Real-time: Changes streamed as they happen
- Reliable: Based on transaction log, not polling
- Use cases: Cache invalidation, search indexing, analytics sync
4. Saga Pattern (Distributed Transactions)
sequenceDiagram
participant Order as Order Service
participant Kafka
participant Inventory as Inventory Service
participant Payment as Payment Service
participant Shipping as Shipping Service
Order->>Kafka: OrderCreated
Kafka->>Inventory: OrderCreated
Inventory->>Kafka: InventoryReserved
Kafka->>Payment: InventoryReserved
Payment->>Kafka: PaymentProcessed
Kafka->>Shipping: PaymentProcessed
Shipping->>Kafka: ShipmentScheduled
Kafka->>Order: ShipmentScheduled
Order->>Order: Mark order complete
Note over Order,Shipping: If any step fails, compensating events undo previous steps
# Saga with compensating transactions
class OrderSaga:
def on_order_created(self, event):
# Reserve inventory
publish("inventory-commands", {"type": "ReserveInventory", ...})
def on_inventory_reserved(self, event):
# Process payment
publish("payment-commands", {"type": "ProcessPayment", ...})
def on_payment_failed(self, event):
# Compensate: release inventory
publish("inventory-commands", {"type": "ReleaseInventory", ...})
def on_inventory_reserve_failed(self, event):
# Compensate: cancel order
publish("order-commands", {"type": "CancelOrder", ...})
Real-World Use Cases
1. Real-Time Analytics Dashboard
User Events → Kafka → Flink (aggregate per minute) → Redis → Dashboard
# Flink SQL
SELECT
window_start,
page_url,
COUNT(*) as page_views,
COUNT(DISTINCT user_id) as unique_visitors
FROM TABLE(
TUMBLE(TABLE page_views, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, page_url;
2. Fraud Detection
Transactions → Kafka → Flink CEP → Alerts
# Complex Event Processing: Detect 3+ transactions in 5 minutes from different countries
Pattern<Transaction> pattern = Pattern
.<Transaction>begin("first")
.where(tx -> true)
.followedBy("second")
.where(tx -> !tx.country.equals(prev.country))
.followedBy("third")
.where(tx -> !tx.country.equals(prev.country))
.within(Time.minutes(5));
3. Log Aggregation
App Logs → Kafka → Logstash → Elasticsearch → Kibana
# High throughput: 100K+ logs/second
# Kafka handles backpressure, Elasticsearch can catch up
4. Event-Driven Microservices
# Order placed → multiple services react
order-events:
- OrderService publishes OrderPlaced
- InventoryService: reserve stock
- PaymentService: charge card
- NotificationService: send confirmation
- AnalyticsService: update metrics
- RecommendationService: update user profile
Key Takeaways
When to Use Streaming
| Use Streaming When |
Use Request-Response When |
| Multiple consumers need same events |
Single consumer, synchronous response needed |
| Decoupling services |
Tight coupling is acceptable |
| Event replay needed |
No replay requirement |
| High throughput, async processing |
Low throughput, sync processing |
| Building audit logs |
Simple CRUD operations |
Kafka Configuration Cheat Sheet
| Requirement |
Configuration |
| No data loss |
acks=all, min.insync.replicas=2 |
| Exactly-once |
enable.idempotence=true, transactional.id=X |
| High throughput |
acks=1, batch.size=65536, linger.ms=5 |
| Low latency |
acks=1, linger.ms=0 |
| Ordering |
Use same partition key for related events |
← Back to Index