Building Resilient Asynchronous Systems
State machines provide explicit, predictable state management for complex asynchronous workflows. They're crucial for:
A Finite State Machine consists of:
from enum import Enum, auto
class State(Enum):
"""Explicit state enumeration"""
EMPTY = auto()
PENDING = auto()
WAITING_ON_REVIEW = auto()
APPROVED = auto()
REJECTED = auto()
CANCELLED = auto()
class StateMachine:
def __init__(self):
self.state = State.EMPTY
self.state_history = [(State.EMPTY, datetime.now())]
def transition_to(self, new_state: State):
"""Explicit state transition with audit trail"""
old_state = self.state
self.state = new_state
self.state_history.append((new_state, datetime.now()))
logger.info(f"Transition: {old_state.name} → {new_state.name}")
The Actor Model is a concurrent computation model where "actors" are the fundamental units of computation. Each actor:
from dataclasses import dataclass
from typing import Callable, Dict
@dataclass
class Message:
"""Base message class"""
timestamp: datetime = None
@dataclass
class CreateCampaignMsg(Message):
"""Command to create campaign"""
campaign_name: str
budget: float
target_audience: str
user_id: str
@dataclass
class ReviewReceivedMsg(Message):
"""Event: Review decision received"""
approved: bool
reviewer_id: str
comments: str
class CampaignActor:
def __init__(self, campaign_id: str):
self.campaign_id = campaign_id
self.state = State.EMPTY
# Map state → message type → handler function
self.handlers = self._build_command_handlers()
def _build_command_handlers(self) -> Dict[State, Dict[type, Callable]]:
"""
Build message routing table
Only certain messages are valid in each state
"""
return {
State.EMPTY: {
CreateCampaignMsg: self._handle_create_campaign,
},
State.WAITING_ON_REVIEW: {
ReviewReceivedMsg: self._handle_review_received,
CancelCampaignMsg: self._handle_cancel,
},
State.APPROVED: {
# Terminal state - no transitions allowed
},
}
async def handle_message(self, message: Message) -> Dict[str, Any]:
"""
Main message handler - routes to appropriate handler
based on current state and message type
"""
message_type = type(message)
state_handlers = self.handlers.get(self.state, {})
# Validate message is allowed in current state
if message_type not in state_handlers:
return {
'status': 'error',
'message': f'Invalid message {message_type.__name__} for state {self.state.name}'
}
# Route to handler
handler = state_handlers[message_type]
return await handler(message)
async def _handle_create_campaign(self, msg: CreateCampaignMsg) -> Dict[str, Any]:
"""
Handle campaign creation
State transition: EMPTY → PENDING → WAITING_ON_REVIEW
"""
logger.info(f"Creating campaign: {msg.campaign_name}")
# Transition to PENDING
self._transition_to(State.PENDING)
# Simulate async work (API calls, DB writes, etc.)
await asyncio.sleep(0.5)
# Create campaign entity
self.campaign = Campaign(
campaign_id=self.campaign_id,
campaign_name=msg.campaign_name,
budget=msg.budget,
target_audience=msg.target_audience,
created_by=msg.user_id,
created_at=datetime.now(),
status=State.PENDING
)
# Transition to next state
self._transition_to(State.WAITING_ON_REVIEW)
return {
'status': 'success',
'message': 'Campaign created, pending review',
'current_state': self.state.name
}
async def _handle_review_received(self, msg: ReviewReceivedMsg) -> Dict[str, Any]:
"""
Handle review decision
State transition:
- WAITING_ON_REVIEW → APPROVED (if approved)
- WAITING_ON_REVIEW → REJECTED (if rejected)
"""
logger.info(f"Review: {'APPROVED' if msg.approved else 'REJECTED'}")
# Update campaign with review info
self.campaign.review_comments = msg.comments
self.campaign.reviewed_by = msg.reviewer_id
self.campaign.reviewed_at = datetime.now()
if msg.approved:
self._transition_to(State.APPROVED)
self.campaign.status = State.APPROVED
# Activate campaign
await asyncio.sleep(0.3)
return {
'status': 'success',
'message': 'Campaign approved and activated',
'current_state': self.state.name
}
else:
self._transition_to(State.REJECTED)
self.campaign.status = State.REJECTED
return {
'status': 'success',
'message': 'Campaign rejected',
'current_state': self.state.name,
'reason': msg.comments
}
Akka is the original and most mature actor framework for the JVM, written for Java and Scala. It popularized the actor model for building resilient, distributed systems and inspired many implementations in other languages.
import akka.actor.typed.*;
import akka.actor.typed.javadsl.*;
// Message types
interface CampaignCommand {}
record CreateCampaign(String name, double budget, String targetAudience)
implements CampaignCommand {}
record ReviewReceived(boolean approved, String reviewerId, String comments)
implements CampaignCommand {}
record CancelCampaign(String reason) implements CampaignCommand {}
// States
enum State {
EMPTY, PENDING, WAITING_ON_REVIEW, APPROVED, REJECTED, CANCELLED
}
// Actor behavior
public class CampaignActor extends AbstractBehavior {
private State state = State.EMPTY;
private Campaign campaign;
public static Behavior create() {
return Behaviors.setup(CampaignActor::new);
}
private CampaignActor(ActorContext context) {
super(context);
}
@Override
public Receive createReceive() {
// Route messages based on current state
return switch (state) {
case EMPTY -> handleEmpty();
case WAITING_ON_REVIEW -> handleWaitingOnReview();
case APPROVED, REJECTED, CANCELLED -> handleTerminal();
default -> Behaviors.same();
};
}
private Receive handleEmpty() {
return newReceiveBuilder()
.onMessage(CreateCampaign.class, this::onCreateCampaign)
.build();
}
private Receive handleWaitingOnReview() {
return newReceiveBuilder()
.onMessage(ReviewReceived.class, this::onReviewReceived)
.onMessage(CancelCampaign.class, this::onCancelCampaign)
.build();
}
private Receive handleTerminal() {
return newReceiveBuilder()
.onAnyMessage(msg -> {
getContext().getLog().warn("Invalid message in terminal state: {}", msg);
return Behaviors.same();
})
.build();
}
private Behavior onCreateCampaign(CreateCampaign cmd) {
getContext().getLog().info("Creating campaign: {}", cmd.name());
state = State.PENDING;
campaign = new Campaign(cmd.name(), cmd.budget(), cmd.targetAudience());
// Simulate async work
state = State.WAITING_ON_REVIEW;
getContext().getLog().info("Campaign created, awaiting review");
return this;
}
private Behavior onReviewReceived(ReviewReceived cmd) {
if (cmd.approved()) {
state = State.APPROVED;
getContext().getLog().info("Campaign APPROVED");
} else {
state = State.REJECTED;
getContext().getLog().info("Campaign REJECTED: {}", cmd.comments());
}
return this;
}
private Behavior onCancelCampaign(CancelCampaign cmd) {
state = State.CANCELLED;
getContext().getLog().info("Campaign CANCELLED: {}", cmd.reason());
return this;
}
}
Akka provides a more structured FSM builder that makes state transitions explicit:
import akka.actor.typed.javadsl.*;
public class CampaignFSM {
// State data holders
interface StateData {}
record EmptyData() implements StateData {}
record PendingData(Campaign campaign) implements StateData {}
record ReviewData(Campaign campaign) implements StateData {}
public static Behavior create() {
return Behaviors.setup(context ->
new CampaignFSM(context).empty(new EmptyData())
);
}
private final ActorContext context;
private CampaignFSM(ActorContext context) {
this.context = context;
}
// EMPTY state behavior
private Behavior empty(EmptyData data) {
return Behaviors.receive(CampaignCommand.class)
.onMessage(CreateCampaign.class, cmd -> {
context.getLog().info("State: EMPTY → PENDING");
Campaign campaign = new Campaign(cmd.name(), cmd.budget(), cmd.targetAudience());
// Transition to WAITING_ON_REVIEW
context.getLog().info("State: PENDING → WAITING_ON_REVIEW");
return waitingOnReview(new ReviewData(campaign));
})
.build();
}
// WAITING_ON_REVIEW state behavior
private Behavior waitingOnReview(ReviewData data) {
return Behaviors.receive(CampaignCommand.class)
.onMessage(ReviewReceived.class, cmd -> {
if (cmd.approved()) {
context.getLog().info("State: WAITING_ON_REVIEW → APPROVED");
return approved(data);
} else {
context.getLog().info("State: WAITING_ON_REVIEW → REJECTED");
return rejected(data);
}
})
.onMessage(CancelCampaign.class, cmd -> {
context.getLog().info("State: WAITING_ON_REVIEW → CANCELLED");
return cancelled(data);
})
.build();
}
// Terminal states
private Behavior approved(ReviewData data) {
context.getLog().info("Campaign approved: {}", data.campaign().getName());
return Behaviors.stopped(); // Or Behaviors.same() to keep actor alive
}
private Behavior rejected(ReviewData data) {
context.getLog().info("Campaign rejected");
return Behaviors.stopped();
}
private Behavior cancelled(ReviewData data) {
context.getLog().info("Campaign cancelled");
return Behaviors.stopped();
}
}
Akka Persistence enables crash recovery through event sourcing:
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.*;
public class PersistentCampaignActor
extends EventSourcedBehavior {
public static Behavior create(String campaignId) {
return Behaviors.setup(context ->
EventSourcedBehavior.start(
new PersistentCampaignActor(
PersistenceId.of("Campaign", campaignId)
),
context
)
);
}
private PersistentCampaignActor(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public CampaignState emptyState() {
// Initial state
return new CampaignState(State.EMPTY, null);
}
@Override
public CommandHandler commandHandler() {
return newCommandHandlerBuilder()
// EMPTY state handlers
.forState(state -> state.currentState() == State.EMPTY)
.onCommand(CreateCampaign.class, (state, cmd) ->
// Persist event, then update state
Effect().persist(
new CampaignCreated(cmd.name(), cmd.budget(), cmd.targetAudience())
)
)
// WAITING_ON_REVIEW state handlers
.forState(state -> state.currentState() == State.WAITING_ON_REVIEW)
.onCommand(ReviewReceived.class, (state, cmd) -> {
if (cmd.approved()) {
return Effect().persist(new CampaignApproved(cmd.reviewerId()));
} else {
return Effect().persist(
new CampaignRejected(cmd.reviewerId(), cmd.comments())
);
}
})
.build();
}
@Override
public EventHandler eventHandler() {
return newEventHandlerBuilder()
// Update state based on events
.forAnyState()
.onEvent(CampaignCreated.class, (state, evt) ->
new CampaignState(
State.WAITING_ON_REVIEW,
new Campaign(evt.name(), evt.budget(), evt.targetAudience())
)
)
.onEvent(CampaignApproved.class, (state, evt) ->
state.withState(State.APPROVED)
)
.onEvent(CampaignRejected.class, (state, evt) ->
state.withState(State.REJECTED)
)
.build();
}
}
// Events - these are persisted and used for recovery
interface CampaignEvent {}
record CampaignCreated(String name, double budget, String targetAudience)
implements CampaignEvent {}
record CampaignApproved(String reviewerId) implements CampaignEvent {}
record CampaignRejected(String reviewerId, String comments) implements CampaignEvent {}
| Concept | Description | Python Equivalent |
|---|---|---|
| ActorRef | Reference to an actor (for sending messages) | Actor instance reference |
| ActorContext | Access to actor system, logging, spawning | Self, logger attributes |
| Behavior | Defines how actor reacts to messages | handle_message() method |
| Pattern Matching | .onMessage(Type.class, handler) | type(message) → handler dict |
| State Transitions | Return new Behavior | self.state = new_state |
| Persistence | EventSourcedBehavior | get_snapshot/restore_snapshot |
| Supervision | Parent monitors child failures | Try/except with restart logic |
// Parent actor that supervises children
public class CampaignSupervisor extends AbstractBehavior {
public static Behavior create() {
return Behaviors.setup(CampaignSupervisor::new);
}
private CampaignSupervisor(ActorContext context) {
super(context);
}
@Override
public Receive createReceive() {
return newReceiveBuilder()
.onMessage(CreateCampaignActor.class, this::onCreateActor)
.build();
}
private Behavior onCreateActor(CreateCampaignActor cmd) {
// Spawn child actor with supervision strategy
ActorRef child = getContext().spawn(
Behaviors.supervise(CampaignActor.create())
.onFailure(
SupervisorStrategy.restart()
.withLimit(3, Duration.ofMinutes(1)) // Max 3 restarts per minute
),
"campaign-" + cmd.campaignId()
);
// If child fails, supervisor will automatically restart it
// Child's state is lost unless using persistence
return this;
}
}
Advantages of Akka:
Python Implementation Benefits:
State machines enable resilient systems that can recover from crashes:
class CampaignActorWithPersistence(CampaignActor):
"""Actor with state persistence for crash recovery"""
def __init__(self, campaign_id: str, persisted_state: Optional[Dict] = None):
super().__init__(campaign_id)
if persisted_state:
self._restore_from_snapshot(persisted_state)
logger.info(f"Restored from snapshot, state: {self.state.name}")
def get_snapshot(self) -> Dict:
"""Get current state snapshot for persistence"""
return {
'campaign_id': self.campaign_id,
'state': self.state.name,
'campaign': self._campaign_to_dict()
}
def _restore_from_snapshot(self, snapshot: Dict):
"""Restore actor state from snapshot"""
self.state = State[snapshot['state']]
if snapshot.get('campaign'):
# Restore campaign data
c = snapshot['campaign']
self.campaign = Campaign(
campaign_id=c['campaign_id'],
campaign_name=c['campaign_name'],
budget=c['budget'],
target_audience=c['target_audience'],
created_by=c['created_by'],
created_at=datetime.fromisoformat(c['created_at']),
status=State[c['status']]
)
# Crash recovery example
async def demo_crash_recovery():
# 1. Create actor and process messages
actor = CampaignActorWithPersistence("CAMP-001")
await actor.handle_message(CreateCampaignMsg(...))
# 2. Save state before crash
snapshot = actor.get_snapshot()
# 3. 💥 CRASH! Actor destroyed
del actor
# 4. Recover from snapshot
recovered_actor = CampaignActorWithPersistence(
"CAMP-001",
persisted_state=snapshot
)
# 5. Continue processing
await recovered_actor.handle_message(ReviewReceivedMsg(...))
# ✓ Workflow continues seamlessly!
| Use Case | States | Benefits |
|---|---|---|
| Order Processing | CART → CHECKOUT → PAYMENT → PROCESSING → SHIPPED → DELIVERED | Track order lifecycle, prevent invalid transitions, enable refunds |
| User Onboarding | NEW → EMAIL_VERIFIED → PROFILE_COMPLETED → ACTIVE | Guide users through steps, skip completed steps on return |
| CI/CD Pipeline | QUEUED → BUILDING → TESTING → DEPLOYING → DEPLOYED | Visualize pipeline, handle failures, enable rollbacks |
| Video Processing | UPLOADED → TRANSCODING → THUMBNAIL → READY → PUBLISHED | Retry failed steps, track progress, parallel processing |
| Document Approval | DRAFT → REVIEW → APPROVED/REJECTED → PUBLISHED | Audit trail, multiple reviewers, version control |
Experience the state machine in action. Try different workflows and see how invalid transitions are prevented.
The original actor framework for JVM languages (Java, Scala)
Distributed computing framework with actor support
Microsoft's virtual actor framework
While state machines and actors teach fundamental principles, Temporal represents the evolution of these patterns for production systems. It provides the same resilience benefits with dramatically simpler code.
// 200+ lines of code
class CampaignActor {
State state = EMPTY;
commandHandler() {
forState(EMPTY)
.match(CreateMsg, this::handleCreate)
forState(WAITING_REVIEW)
.match(ReviewMsg, this::handleReview)
}
handleCreate(msg) {
// Manual state transition
state = PENDING;
// Manual retry logic
try {
createCampaign()
} catch {
// retry logic...
}
state = WAITING_REVIEW;
}
}
// 50 lines of code
@workflow.defn
class CampaignWorkflow {
ReviewStatus reviewStatus;
@workflow.run
async run(request) {
// Step 1: Create (auto-retry)
await activities.createCampaign(request);
// Step 2: Wait (durable, can wait days!)
await workflow.wait_condition(
() => reviewStatus != null
);
// Step 3: Process decision
if (reviewStatus == APPROVED) {
await activities.markApproved();
} else {
await activities.markRejected();
}
}
@workflow.signal
reviewSignal(status) {
reviewStatus = status;
}
}
| Feature | State Machine | Temporal |
|---|---|---|
| Code Complexity | 200+ lines, scattered logic | 50 lines, linear flow |
| State Management | Manual tracking, transitions | Automatic event sourcing |
| Persistence | Manual snapshots, restore | Built-in, zero config |
| Retries | Manual implementation | Built-in with backoff |
| Long Waits | Complex (timers + state) | Simple: wait_condition() |
| Visibility | Custom monitoring | Built-in Web UI |
| Testing | Mock messages, states | Test like normal code |
You don't need to rewrite everything at once! The InfoQ talk showed a wrapper pattern for gradual migration:
State machines teach you WHY these patterns exist.
Temporal gives you the TOOLS to use them in production.
Both are built on the same principles (event sourcing, message passing, durable state), but Temporal abstracts away the complexity. Learn state machines to understand the fundamentals, use Temporal to build real systems.
Code examples: See temporal_campaign_workflow.py and temporal_wrapper_migration.py for complete Python implementations.
campaign_state_machine.py - Complete state machine implementationtemporal_campaign_workflow.py - Pure Temporal workflowtemporal_wrapper_migration.py - Gradual migration patterntemporal_workflow_analysis.md - Side-by-side comparison