Agent State Machine Design: Turning Uncontrolled Conversations into Recoverable Workflows
⚡ 30-Second Takeaway
- Core Problem: Conversation-driven agents skip steps, duplicate side effects, and lose all progress after restarts because conversation history is not task state. LLMs are probabilistic — without a deterministic wrapper, execution drifts unpredictably. You need an explicit task lifecycle state machine wrapping every agent execution.
- The Solution: A 7-state explicit task lifecycle — planned, running, paused, blocked, retrying, done, failed. Each state has defined entry conditions, allowed events, and legal transition targets. Invalid transitions are rejected, not best-effort.
- Key Implementation:
AgentTaskStateMachinecore class (~250 lines of Python) — TaskState/TaskEvent enums, TRANSITION_TABLE, InvalidTransitionError guard, SQLite persistence, checkpoint/idempotency-key support, recovery strategy. Includes full lifecycle demo and restart recovery verification. - What You Will Walk Away With: A deterministic task lifecycle wrapper for your agent — at every moment, the task knows what it is, what state it is in, what event can move it forward, and how to recover after a restart. No more "the agent just disappeared" production nightmares.
1. Why Production Agents Need Explicit State Machines
It is 2 AM. An automated refund-approval agent is processing a refund. The workflow is simple: validate the order, check the refund policy, request manager approval, call the payment gateway to issue the refund, send a notification. At step 3, after sending the approval request, the payment gateway times out. The agent's conversation history contains the text "approval granted, proceed" — so it retries, skips the approval check, reads "approved" from a step 3 result cache, and executes step 4: the refund. Three seconds later, the manager rejects the refund. But the money has already been sent.
That same afternoon, the same agent is processing another refund when the server is killed by the OOM Killer. The agent process restarts. The conversation history is gone — the LLM context window is empty. The agent starts from scratch: validate order, check policy, request approval... but it has no idea this refund was already processed. It issues a second refund.
These are the three fundamental failure modes of conversation-driven agents:
- Context Drift → Step Skip: The agent's "state" is implicit in conversation history. When the history grows long, gets compressed, truncated, or diluted by attention decay, the agent may skip critical steps. "Approval granted" and "approval requested, awaiting response" can look frustratingly similar in conversational text — and the LLM cannot reliably distinguish them.
- Retry → Duplicate Side Effects: The agent encounters a network timeout and retries, not knowing the first call actually succeeded (the response was lost in transit). Without explicit state records, there is no basis for deduplication.
- Restart → Total Progress Loss: After a restart, the conversation history is not in durable storage. The agent starts from zero, unaware of which steps were already completed.
running state, but it cannot change the task state. State transitions happen outside the LLM, driven by structured events (tool results, approval callbacks, error signals) — not LLM-generated text.
Conversation-Driven vs. State-Machine-Driven Agents
You do not need to read framework source code to see the difference. Here is the contrast:
Conversation-Driven Agent (state implicit in chat history):
┌─────────────────────────────────────────────┐
│ User: Process refund #12345 │
│ Agent: Sure, let me validate the order... │
│ Agent: Order valid, amount $150.00 │
│ Agent: Manager approval required, sent │
│ Agent: [waiting... chat says "sent request"]│
│ --- History truncated/compressed/misread --- │
│ Agent: Approved! Processing refund... WRONG!│
└─────────────────────────────────────────────┘
State-Machine-Driven Agent (state explicit and validated):
┌─────────────────────────────────────────────┐
│ task.state = PLANNED │
│ → START → task.state = RUNNING │
│ → PAUSE_FOR_APPROVAL → task.state = PAUSED│
│ → [wait for approval callback...] │
│ → callback arrives → validate: PAUSED + │
│ APPROVAL_GRANTED → legal → RUNNING │
│ → COMPLETE → task.state = DONE │
│ │
│ At every moment: task.state tells you exactly│
│ where the task is. Context lost? Recover │
│ task.state from durable storage. │
│ Process restarted? task.state is in SQLite. │
└─────────────────────────────────────────────┘
The core difference: a conversation-driven agent's "state" is a semantic position somewhere in 200 chat messages. A state-machine-driven agent's state is a single deterministic value in a database. The former relies on the LLM to infer; the latter is an immutable, verifiable fact.
On why context windows are inherently unreliable, see Agent Context Window Management — context decay is a physical law, not a bug. The state machine is precisely the mechanism that survives context loss and preserves task progress.
2. From Conversation Flow to Task Lifecycle: States, Events, and Transitions
Before diving into specific states, we need precise definitions for the three concepts that constitute the state machine's entire semantics:
- State
- The determinate position of a task at a given moment. State answers: "Where is this task right now?" A task has exactly one current state at any moment. State is explicit, durable, and queryable.
- Event
- An external or internal signal that triggers a state change. Event answers: "What happened that requires the task to react?" Events can be tool call results, human approval callbacks, timeout signals, error codes — but not LLM text output. Events are structured, named, and auditable.
- Transition
- A directed edge from state A to state B, triggered by an event. Transition answers: "Given the current state and the event that occurred, which state should the task go to?" Transition rules are encoded in a transition table and enforced on every attempt. Disallowed transitions raise errors — they do not silently degrade.
The Transition Table: The State Machine's Core Data Structure
The entire behavior of the state machine can be compressed into a single transition table — a mapping from (current_state, event) to target_state. This table serves as both documentation (all legal paths in one view) and execution rules (looked up on every transition attempt).
TRANSITION_TABLE = {
(PLANNED, START): RUNNING,
(RUNNING, PAUSE_FOR_APPROVAL): PAUSED,
(RUNNING, BLOCK_ON_DEPENDENCY): BLOCKED,
(RUNNING, COMPLETE): DONE,
(RUNNING, FATAL_ERROR): FAILED,
(RUNNING, TRANSIENT_ERROR): RETRYING,
(PAUSED, APPROVAL_GRANTED): RUNNING,
(PAUSED, APPROVAL_DENIED): FAILED,
(PAUSED, TIMEOUT): FAILED,
(BLOCKED, DEPENDENCY_RESOLVED): RUNNING,
(BLOCKED, FATAL_ERROR): FAILED,
(RETRYING, RETRY): RUNNING,
(RETRYING, MAX_RETRIES_EXCEEDED): FAILED,
(RETRYING, FATAL_ERROR): FAILED,
# DONE and FAILED are terminal: no outgoing edges
}
Note what is absent from this table:
- No
(PAUSED, COMPLETE)— you cannot skip approval and complete the task directly. - No
(FAILED, RETRY)— failure is terminal. Retries flow throughrunning → retrying → running, not fromfailed. - No
(DONE, any event)— terminal states accept no events.
These "missing transitions" are not bugs — they are design constraints. The transition table enforces flow control through "what is not listed is illegal" — cleaner and harder to bypass than scattered if-else checks.
State Diagram: The Seven States at a Glance
┌─────────┐
│ PLANNED │
└────┬────┘
│ START
▼
┌──────────────┴──────────────────┐
│ RUNNING │◄──────────────────────┐
└──┬──────┬──────┬──────┬────────┘ │
│ │ │ │ │
PAUSE_FOR │ │ │ │ COMPLETE │
_APPROVAL │ │ │ │ │
▼ ▼ ▼ ▼ │
┌────────┐ ┌───────┐┌─────────┐┌──────┐ │
│ PAUSED │ │BLOCKED││RETRYING ││ DONE │ │
└───┬──┬─┘ └───┬───┘└────┬────┘└──────┘ │
│ │ │ │ │
APPROVAL │ │TIMEOUT │DEPENDENCY │ RETRY │
_GRANTED │ │ │_RESOLVED └──────────────────────────────┘
│ │ │
│ ▼ ▼
│ ┌──────────┐
│ │ FAILED │◄── MAX_RETRIES_EXCEEDED, FATAL_ERROR,
│ └──────────┘ APPROVAL_DENIED
│
└──────────────────────────────────────────────────────► RUNNING
This diagram shows all 14 legal transitions. Note the key structural features:
- RUNNING is the hub state: Six edges depart from it, three edges arrive at it. Nearly all work happens in
running; all other states are "waiting for a condition" pause states. - PAUSED and BLOCKED are two kinds of "pause" from RUNNING: Both wait for an external condition to return to
running. They differ in why they paused and what unblocks them. - FAILED is the convergence terminal: Five edges point to
failed— approval denied, dependency failure, retries exhausted, fatal error, pause timeout. All unrecoverable paths converge here. - RETRYING → RUNNING forms a retry loop: As long as retries remain, errors are contained within this circuit without leaking to a terminal state.
Invalid Transitions Are Errors, Not Degraded Behavior
One of the most important design decisions: any transition attempt not in the transition table must raise an exception. It must not be silently ignored or degraded.
def transition(self, event: TaskEvent, metadata=None) -> TaskState:
key = (self._state, event)
next_state = TRANSITION_TABLE.get(key)
if next_state is None:
raise InvalidTransitionError(
f"Invalid transition: {self._state.value} + {event.value}"
)
# ... execute transition ...
Why an exception and not a warn log? Because the code calling transition() believes the state has changed. If the call is silently ignored, the caller continues executing based on a false state assumption. The exception forces the caller to confront the situation: either fix a bug (why did this illegal event appear?) or handle a boundary condition (e.g., event reordering due to concurrency races).
On why structured event typing and versioning matters, see Agent Message Schema Design — state transition events and agent messages share the same principles of type safety, version evolution, and validation.
3. Core State Design: planned, running, paused, blocked, retrying, done, failed
Seven states cover the complete lifecycle of an agent task from creation to termination. Each state has clear semantics, entry conditions, allowed operations, and detection methods.
planned
Meaning: Task created, preconditions not yet validated. This is the initial state. In this state, the agent performs no side effects — it is merely preparing. It can validate parameters, allocate resources, and check whether dependent services are reachable.
Allowed events: Only START. Any other event (including COMPLETE, FATAL_ERROR) is illegal in planned.
Why this state exists: It distinguishes "task created" from "task started executing." In distributed systems, a task can be created but never scheduled (scheduler down, queue backlogged). planned lets you monitor "how many tasks are queued but not yet started" — a key metric for capacity planning.
running
Meaning: The agent is actively executing task steps. This is the only state in which the LLM can freely reason. Within this state, the agent calls tools, analyzes results, and generates intermediate outputs — but it cannot change its own task state. State transitions are triggered by structured events (specific tool results, external approval callbacks, error signals), not by LLM text output.
Allowed events: PAUSE_FOR_APPROVAL, BLOCK_ON_DEPENDENCY, COMPLETE, FATAL_ERROR, TRANSIENT_ERROR. With five outgoing edges, running has the most transitions of any state.
Critical constraint: The LLM can produce any output inside running — but it cannot say "the task is done." Only the COMPLETE event (generated by tool calls or evaluation logic) triggers the transition to done. This is the core manifestation of the state machine's "deterministic shell."
paused
Meaning: The agent is waiting for external input — typically human approval, human feedback, or a safety review. This is voluntary waiting. The agent can continue running (monitoring timeouts, sending reminders) but cannot advance the task itself.
Allowed events: APPROVAL_GRANTED (back to running), APPROVAL_DENIED (to failed), TIMEOUT (to failed).
Key distinction from blocked: paused is active waiting — "I need a human's decision." blocked is passive obstruction — "the service I need is unavailable." The former is a normal part of the workflow design; the latter is an anomaly.
blocked
Meaning: The agent cannot proceed because it encountered an unresolvable dependency — a dependent API returns 503, a required file does not exist, permissions are insufficient to access a resource. This is involuntary obstruction.
Allowed events: DEPENDENCY_RESOLVED (back to running), FATAL_ERROR (to failed).
Key distinction from paused: blocked does not auto-timeout into failed — dependency recovery time is unpredictable. But external monitoring should alert: "this task has been blocked for 4 hours, human intervention needed." paused, by contrast, has an explicit timeout policy — approvals cannot wait indefinitely.
retrying
Meaning: The agent encountered a transient error and is re-attempting. This is the buffer state between running and failed.
Allowed events: RETRY (back to running), MAX_RETRIES_EXCEEDED (to failed), FATAL_ERROR (to failed).
Key distinction from running: retrying means the previous attempt failed. This distinction matters because: (1) retries require idempotency guarantees — already-succeeded steps must not be re-executed; (2) retries have an upper bound — infinite retry is an infinite loop; (3) monitoring must distinguish "normal operation" from "currently retrying" — these have entirely different operational semantics.
done — Terminal
Meaning: Task completed successfully. All steps executed, all side effects committed, all notifications sent.
Allowed events: None. Any event in done is illegal. Terminal states are irreversible.
failed — Terminal
Meaning: Task cannot be completed. May be due to rejected approval, exhausted retries, fatal error, permanent dependency failure, or pause timeout.
Allowed events: None. Like done, this is an irreversible terminal state.
But note: failed does not mean "data lost" or "untraceable." The failed task's state history, checkpoint data, and error information all remain in durable storage. An operator can extract completed-step results from a failed task, manually handle the remaining work, or even create a new task instance to continue from the last checkpoint — but that new instance is a new planned task, not a state transition of the old one.
State Accounting: A Task's Lifecycle by the Numbers
Another way to understand the seven states is through a typical task's "state residency":
State Residency Description
─────────────────────────────────────────────────────
planned milliseconds~seconds Validating preconditions
running seconds~minutes Agent executing task steps
paused minutes~hours Awaiting human approval
blocked minutes~hours Awaiting dependency recovery
retrying seconds~minutes Backoff retry window
done permanent Terminal
failed permanent Terminal
Note that running may account for far less time than you expect — in production, agents spend most of their time waiting (paused/blocked), not computing (running). This means the state machine's primary value is not controlling behavior inside running, but managing the semantics and recovery of waiting periods.
4. Pause and Resume: Encoding Human Approval, Feedback, and Safety Gates
Most agent frameworks treat human approval as a special tool call or a chat message. "The agent sends a message saying 'please approve' and then continues executing." This is dangerous — the agent may skip waiting based on conversation context that "looks like" approval before the human actually responds.
The state machine elevates human approval to a first-class state: paused. Once an agent enters paused, it physically cannot enter running without an explicit APPROVAL_GRANTED or APPROVAL_DENIED event. The LLM cannot bypass this by generating text that "looks like approval" — because state transitions do not pass through the LLM.
Three Paths Through the Approval Gate
Path A: Approved (normal flow)
RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_GRANTED → RUNNING
Path B: Denied (terminal)
RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_DENIED → FAILED
Path C: Timeout (protective termination)
RUNNING → PAUSE_FOR_APPROVAL → PAUSED → TIMEOUT → FAILED
Each path is non-bypassable. The agent cannot jump from paused directly to done; it cannot "keep working" while in paused (in paused, you cannot execute task steps — you can only wait for events).
Timeout Handling: Approval Cannot Wait Forever
The paused state has an explicit timeout policy. A typical configuration:
Approval Timeout Policy:
Initial wait: 30 minutes
Reminder: at 15 minutes, send reminder (Slack/email/PagerDuty)
Timeout action: mark FAILED, reason: "approval_timeout"
Grace period: 5 minutes post-timeout — if approval arrives,
operator can manually recover from FAILED
(create a new task instance from checkpoint)
Note that this does not auto-retry — approval is a human decision, not a network timeout. A person's decision does not automatically become "approved" because more time elapsed. Terminating on timeout is the safe default.
Safety Gates: More Than Just Approval
The paused state is not limited to human approval. Any scenario requiring "pause and verify before executing side effects" can use it:
- Safety gates: The agent is about to perform a high-risk operation (drop a database table, modify production config, send a batch notification). In the
running → paused → safety review → runningflow, the security team can inspect parameters while the agent ispaused. - Compliance confirmation: In regulated industries, certain operations require explicit compliance officer approval. The agent waits in
pausedfor compliance confirmation before proceeding. - External system callbacks: The agent triggered an async external process (e.g., filed a ticket) and needs to wait for the external system's completion callback. This is not "human approval" but has identical semantics: pause → await external event → resume.
Integration Pattern with External Approval Systems
The state machine does not bind to a specific approval UI. It defines only the event interface. Here is the integration pattern:
# Webhook receiving approval callbacks
@app.route("/approval-callback", methods=["POST"])
def handle_approval():
data = request.json
task_id = data["task_id"]
decision = data["decision"] # "approved" or "denied"
sm = load_state_machine(task_id) # recover from durable storage
event = APPROVAL_GRANTED if decision == "approved" else APPROVAL_DENIED
sm.transition(event, metadata={
"approver": data["approver"],
"comment": data.get("comment", ""),
"timestamp": data["timestamp"]
})
The approval source can be a Slack button, a Jira status change, a custom web panel, or an enterprise messaging system — as long as it can send an HTTP request with a task_id and decision, it can drive the state machine.
For approval workflow UI/UX design patterns and more complete integration schemes, see Agent Human Approval Workflow — this article provides the underlying state machine; that article provides the upper-layer interaction design.
Repeat: paused is not failure. Many teams alert when an agent is in paused — this is a misunderstanding of state machine semantics. paused is a normal, designed, expected waiting state. Your monitoring dashboard should display paused and blocked separately, not lump them together as "not running."
5. Failure and Retry: Preventing Duplicate Execution, Step Skips, and Infinite Loops
Retry is the most error-prone operation in distributed systems — not because retry itself is complex, but because the state assumptions during retry are frequently wrong. The state machine's retry design revolves around three guarantees:
- No Duplicate Effects: A retry must not re-execute steps that already succeeded.
- No Step Skips: A retry must not leap past incomplete steps to later ones.
- Bounded Retries: Retries have an upper count limit and a backoff strategy — no infinite loops.
The Retry State Machine: A Complete Retry Lifecycle
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=0)
RETRYING → [backoff wait 2^0 = 1s] → RETRY → RUNNING (retry_count=1)
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=1)
RETRYING → [backoff wait 2^1 = 2s] → RETRY → RUNNING (retry_count=2)
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=2)
RETRYING → [backoff wait 2^2 = 4s] → RETRY → RUNNING (retry_count=3)
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=3)
RETRYING → MAX_RETRIES_EXCEEDED → FAILED
Note the key design details:
retry_countincrements duringRETRYING(when processing theRETRYevent), not duringTRANSIENT_ERROR. This ensuresretry_countonly increases when a retry is actually attempted.- Before every
RETRYevent, checkretry_count <= max_retries. If exceeded, you must use theMAX_RETRIES_EXCEEDEDevent, notRETRY— these two events are not interchangeable. - The backoff strategy is implemented outside the transition table — the state machine governs "can we retry?"; the caller governs "when should we retry?"
Retry Backoff Strategy
import time
def retry_with_backoff(sm: AgentTaskStateMachine, max_retries=3):
"""Execute a retry loop with exponential backoff."""
while sm.state == TaskState.RETRYING:
backoff = 2 ** sm.retry_count # exponential: 2, 4, 8...
print(f"[retry] backing off {backoff}s (attempt {sm.retry_count})")
time.sleep(backoff)
try:
sm.transition(TaskEvent.RETRY)
return # back to RUNNING, caller resumes execution
except InvalidTransitionError:
# exceeded max retries, use MAX_RETRIES_EXCEEDED
sm.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
raise
Idempotency: The State Machine Prevents Duplicate Triggers, Not External Duplicates
This is one of the article's most important honest caveats:
To achieve stronger idempotency, two layers must cooperate:
- Agent side (state machine layer): Write an idempotency key checkpoint before executing any side-effecting action. Check whether the key already exists before execution.
- External system side: The external API must support idempotency keys (e.g., Stripe's
Idempotency-Keyheader) and deduplicate on the server side.
# Agent-side idempotency key pattern
def execute_with_idempotency(sm, step_name, action):
idempotency_key = f"{sm.task_id}:{step_name}"
# Already completed? Return cached result
if sm.get_checkpoint(idempotency_key) == "done":
print(f"[idempotency] step '{step_name}' already done, skipping")
return sm.get_checkpoint(f"{idempotency_key}:result")
# Mark as executing (prevents concurrent duplicates)
sm.set_checkpoint(idempotency_key, "executing")
try:
result = action(idempotency_key) # pass key to external system
sm.set_checkpoint(idempotency_key, "done")
sm.set_checkpoint(f"{idempotency_key}:result", result)
return result
except Exception:
# Execution failed — do not mark done
# State remains "executing"; on recovery, check external system
raise
The key pattern: Mark executing first, then execute, then mark done on success. If the process crashes during execution, recovery sees executing and knows that attempt's outcome is uncertain — it should query the external system for confirmation rather than blindly retrying or blindly skipping.
Retryable vs. Non-Retryable Errors
| Error Type | Example | Retry? | Handling |
|---|---|---|---|
| Transient network error | Connection timeout, DNS resolution failure | ✓ Yes | TRANSIENT_ERROR → RETRYING |
| Service temporarily unavailable | 503 Service Unavailable, 429 Rate Limited | ✓ Yes | TRANSIENT_ERROR → RETRYING (with backoff) |
| Response lost in transit | Request succeeded; response lost in network | ✓ Carefully | Query external system via idempotency key before deciding |
| Parameter validation failure | 422 Unprocessable Entity | ✗ No | FATAL_ERROR → FAILED (retry won't change the outcome) |
| Insufficient permissions | 403 Forbidden | ✗ No | FATAL_ERROR → FAILED |
| Data corruption | Required field empty, malformed data | ✗ No | FATAL_ERROR → FAILED |
The core decision criterion: If you retry with all inputs unchanged, will the outcome change? If the answer is "no" (the same request always returns 422), do not retry. If the answer is "maybe" (network recovers, service recovers), retry is appropriate.
6. State Storage and Recovery: Continuing After Restarts, Disconnections, and Context Resets
This is the state machine's most fundamental value — task state is durable. The agent process can be killed, the server can restart, the LLM context can be wiped — but the task state remains.
What Must Be Persisted
A complete state storage scheme must persist the following data:
┌─────────────────────────────────────────────────┐
│ task_checkpoints table │
├──────────────┬──────────────────────────────────┤
│ task_id │ Unique task identifier │
│ state │ Current state (planned/running/...)│
│ retry_count │ Current retry count │
│ checkpoint │ JSON: {step results, data, keys} │
│ updated_at │ Last update timestamp │
└──────────────┴──────────────────────────────────┘
┌─────────────────────────────────────────────────┐
│ state_transitions table │
├──────────────┬──────────────────────────────────┤
│ id │ Auto-increment primary key │
│ task_id │ Unique task identifier │
│ from_state │ Pre-transition state │
│ to_state │ Post-transition state │
│ event │ Triggering event │
│ timestamp │ ISO 8601 timestamp │
│ metadata │ JSON: {error info, approver, ...} │
└──────────────┴──────────────────────────────────┘
Note that the state transitions table is append-only — each row is one transition event, never updated or deleted. This means you have a complete audit trail: at any moment, you can trace a task's every step from planned to done/failed.
Storage Backend Selection
| Backend | Use Case | Caveats |
|---|---|---|
| SQLite | Single-machine agent, demos, local tools | Simple, zero dependencies, transaction-safe. Not suitable for concurrent writes — multiple agent instances sharing one SQLite file cause lock contention. Use WAL mode in production. |
| PostgreSQL | Production agents, multi-instance deployment | Supports concurrency, connection pooling, primary-replica replication. Task state storage is critical infrastructure — if the state database goes down, all agents fail to transition (fail closed, safe). |
| Redis + AOF | Low-latency scenarios | Fast in memory, but durability is weaker than PostgreSQL. If using Redis, enable AOF persistence (appendonly yes) and accept that the last few seconds of transition records may be lost on restart. |
| File-based JSON | Prototypes, demos, single-file tools | Simplest but not production-suitable — no transactions, no concurrency control, no recovery guarantees. |
Recovery Strategy: Handling Four Stale States
After an agent restart, load task state from durable storage, then execute the recovery strategy based on the current state:
def recover(self):
saved = self.persistence.load_checkpoint(self.task_id)
if saved:
self._state = TaskState(saved["state"])
self._retry_count = saved.get("retry_count", 0)
self._checkpoint = saved.get("checkpoint_data", {})
self._history = self.persistence.load_history(self.task_id)
# Handle stale states
if self._state == TaskState.RUNNING:
# Process crashed while RUNNING → mark for retry
self.transition(TaskEvent.TRANSIENT_ERROR,
metadata={"reason": "recovery_stale_running"})
elif self._state == TaskState.RETRYING:
# Process crashed while RETRYING → check count, then retry
if self._retry_count < self._max_retries:
self.transition(TaskEvent.RETRY)
else:
self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
elif self._state == TaskState.PAUSED:
# Check if approval timed out
paused_at = self._get_last_transition_time()
if time_since(paused_at) > APPROVAL_TIMEOUT:
self.transition(TaskEvent.TIMEOUT,
metadata={"reason": "recovery_approval_timeout"})
elif self._state == TaskState.BLOCKED:
# Check if dependency recovered (re-probe dependency service)
if self._check_dependency():
self.transition(TaskEvent.DEPENDENCY_RESOLVED)
# Still blocked → remain BLOCKED, await external monitoring alert
return self
Recovery defaults should bias toward safety:
- Stale
running→ default to retry. This is the safest default — the agent crashed mid-execution, and you do not know how much it completed. Idempotency keys prevent duplicate execution on retry. - Stale
paused→ check timeout. If within the timeout window, stay inpaused. If expired, transition tofailed. - Stale
retrying→ check retry count. If not exhausted, continue retrying. If exhausted, transition tofailed. - What not to do: Do not auto-recover from
failedordone. Terminal means terminal.
Task-State Recovery vs. LLM Context Recovery: Two Different Things
On context window recovery strategies, see Agent Context Window Management — the cross-window state management section covers how to rebuild LLM working context from checkpoints after a context reset. On the memory system's persistence architecture (how each L0-L3 layer stores data), see Agent Memory System Design — task checkpoints and semantic memory are different storage layers.
7. Observable State Machines: Logs, Metrics, Audit Trails, and Alerts
The state machine is not merely an execution model — it is itself an observability primitive. Every state transition is an immutable event record, naturally suited for building audit trails, metric dashboards, and alerting rules.
Transition Events: The Atomic Unit of Observation
Each state transition produces a structured event:
{
"task_id": "task_a1b2c3d4",
"from_state": "running",
"to_state": "paused",
"event": "pause_for_approval",
"timestamp": "2026-06-05T14:22:31.123456+00:00",
"metadata": {
"step": "refund_approval",
"amount": 150.00,
"requested_by": "agent_refund_processor"
}
}
This event records what happened, why, and when. All transition events sorted by time form the task's complete audit trail — no separate "audit log system" is needed; the state transition events are the audit log.
Observability Hook: Triggered on Every Transition
# on_transition hook: called after every state transition
def on_transition(transition: StateTransition):
# 1. Structured logging
logger.info(json.dumps({
"event": "state_transition",
"task_id": transition.task_id,
"from": transition.from_state.value,
"to": transition.to_state.value,
"trigger": transition.event.value,
"timestamp": transition.timestamp,
"metadata": transition.metadata
}))
# 2. Metric instrumentation
metrics.increment(f"transition.{transition.to_state.value}")
metrics.gauge(f"task.{transition.task_id}.state",
transition.to_state.value)
# 3. Alert checking
if transition.to_state == TaskState.RETRYING:
retry_count = transition.metadata.get("retry_count", 0)
if retry_count >= 3:
alerts.send("high_retry_count",
f"Task {transition.task_id} retried {retry_count} times")
On structured event schema typing and versioning, see Agent Message Schema Design — state transition events should have typed, versioned, validated schemas, just like agent messages.
Core Metrics
These are the key metrics to compute from the state transition stream:
| Metric | Computation | Purpose |
|---|---|---|
| time_in_state | now - timestamp of entry into current state | SLA monitoring: how long has the task been running? How long has it been paused? |
| transition_counts | Aggregate count by event type | Anomaly detection: retry count spiking? Approval denial rate anomalous? |
| state_distribution | Task count per state | Capacity planning: how many tasks queued (planned)? How many waiting (paused/blocked)? |
| retry_rate | retrying transitions / total transitions | Health assessment: rising retry rate signals unstable dependencies |
| mean_time_to_recovery | Avg time from paused/blocked/retrying back to running | Recovery efficiency: approval response time, dependency recovery time, retry backoff window |
| invalid_transition_attempts | Count of InvalidTransitionError raises | Bug detection: event reordering, concurrency races, caller logic errors |
Alerting Rules
Alerts based on state machine metrics should detect the following conditions:
Alert Rules:
1. task_in_running_too_long:
condition: time_in_state(RUNNING) > 30min
action: page oncall with task_id and last checkpoint
meaning: Agent may be stuck (infinite loop, unresponsive tool call)
2. task_retry_flapping:
condition: retry_count >= 3 AND oscillating RETRYING ↔ RUNNING
action: escalate alert, human intervention
meaning: Backoff too short, or the dependency issue is not transient
3. task_paused_abandoned:
condition: time_in_state(PAUSED) > 4h
action: notify approver + oncall
meaning: Approval may be forgotten — needs reminder or escalation
4. task_blocked_prolonged:
condition: time_in_state(BLOCKED) > 2h
action: notify oncall
meaning: Dependency service down for extended period, needs human intervention
5. too_many_failed_tasks:
condition: failed_count / total_tasks > 0.3 (sliding window)
action: critical alert
meaning: Systemic issue — possible dependency failure or configuration error
6. invalid_transition_spike:
condition: invalid_transition_attempts > 10/min
action: notify oncall
meaning: Caller code bug or concurrency race causing event reordering
For a more complete agent observability system (metric collection, dashboard construction, alerting pipeline), see Agent Observability — this article focuses on the observation signals produced by state transitions; that article covers the full agent monitoring infrastructure. On audit log immutability, retention policies, and compliance requirements, see Agent Audit Log Design — the state transition history is one implementation form of an audit log.
Log Level Guidance
Log Levels:
INFO: Every successful state transition
Example: "Task abc123: RUNNING → PAUSED (pause_for_approval)"
WARN: Illegal transition attempt (blocked by state machine)
Example: "Task abc123: Rejected transition PAUSED + COMPLETE"
Important — this is a signal of a bug or attack
ERROR: State persistence failure
Example: "Task abc123: Failed to write transition record to database"
The transition should fail (fail closed), task stays in current state
DEBUG: State machine internals (transition table lookup, checkpoint read/write)
Enable only in development
8. Complete Example: A Recoverable AgentTaskStateMachine Skeleton in Python
Below is a runnable Python reference implementation demonstrating every concept discussed in this article — state enums, transition table, invalid transition guard, SQLite persistence, checkpointing, recovery strategy, and lifecycle demos. This is not a production-grade library (it lacks connection pooling, migrations, horizontal scaling, etc.) but rather a production-pattern skeleton you can adapt to your own agent.
Complete Code
"""
AgentTaskStateMachine — A Recoverable Agent Task State Machine
==============================================================
This is a production-pattern skeleton, not a deployable library.
Missing: connection pooling, database migrations, horizontal scaling,
security hardening.
Provides: complete 7-state lifecycle, transition validation,
persistence, recovery.
Requirements: Python 3.9+ (stdlib + sqlite3 only)
"""
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Dict, List, Any
import json
import sqlite3
import time
import uuid
# ── State Enum ───────────────────────────────────────
class TaskState(str, Enum):
"""Explicit lifecycle states for an agent task."""
PLANNED = "planned" # Task created, preconditions not yet validated
RUNNING = "running" # Agent actively executing steps
PAUSED = "paused" # Awaiting human approval or external input
BLOCKED = "blocked" # Cannot proceed — dependency unavailable
RETRYING = "retrying" # Retrying after a failed attempt
DONE = "done" # Terminal: success
FAILED = "failed" # Terminal: unrecoverable failure
# ── Event Enum ───────────────────────────────────────
class TaskEvent(str, Enum):
"""Events that trigger state transitions."""
START = "start"
PAUSE_FOR_APPROVAL = "pause_for_approval"
APPROVAL_GRANTED = "approval_granted"
APPROVAL_DENIED = "approval_denied"
BLOCK_ON_DEPENDENCY = "block_on_dependency"
DEPENDENCY_RESOLVED = "dependency_resolved"
TRANSIENT_ERROR = "transient_error"
RETRY = "retry"
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
COMPLETE = "complete"
FATAL_ERROR = "fatal_error"
TIMEOUT = "timeout"
# ── Transition Table ─────────────────────────────────
# Format: (current_state, event) → target_state
# Not in table = illegal → raise InvalidTransitionError
TRANSITION_TABLE: Dict[tuple, TaskState] = {
(TaskState.PLANNED, TaskEvent.START): TaskState.RUNNING,
(TaskState.RUNNING, TaskEvent.PAUSE_FOR_APPROVAL): TaskState.PAUSED,
(TaskState.RUNNING, TaskEvent.BLOCK_ON_DEPENDENCY): TaskState.BLOCKED,
(TaskState.RUNNING, TaskEvent.COMPLETE): TaskState.DONE,
(TaskState.RUNNING, TaskEvent.FATAL_ERROR): TaskState.FAILED,
(TaskState.RUNNING, TaskEvent.TRANSIENT_ERROR): TaskState.RETRYING,
(TaskState.PAUSED, TaskEvent.APPROVAL_GRANTED): TaskState.RUNNING,
(TaskState.PAUSED, TaskEvent.APPROVAL_DENIED): TaskState.FAILED,
(TaskState.PAUSED, TaskEvent.TIMEOUT): TaskState.FAILED,
(TaskState.BLOCKED, TaskEvent.DEPENDENCY_RESOLVED): TaskState.RUNNING,
(TaskState.BLOCKED, TaskEvent.FATAL_ERROR): TaskState.FAILED,
(TaskState.RETRYING, TaskEvent.RETRY): TaskState.RUNNING,
(TaskState.RETRYING, TaskEvent.MAX_RETRIES_EXCEEDED): TaskState.FAILED,
(TaskState.RETRYING, TaskEvent.FATAL_ERROR): TaskState.FAILED,
# DONE and FAILED are terminal: no outgoing edges
}
# ── Data Classes ─────────────────────────────────────
@dataclass
class StateTransition:
"""An immutable record of a state change."""
task_id: str
from_state: TaskState
to_state: TaskState
event: TaskEvent
timestamp: str
metadata: Dict[str, Any] = field(default_factory=dict)
class InvalidTransitionError(Exception):
"""Raised when a transition is not in the transition table."""
pass
# ── State Persistence (SQLite) ───────────────────────
class StatePersistence:
"""SQLite-backed state persistence.
Production considerations:
- Multi-instance deployments should use PostgreSQL
- SQLite is suitable for single-machine demos or single-process agents
- Use WAL mode to reduce lock contention
"""
def __init__(self, db_path: str = "agent_state.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA journal_mode=WAL")
self._init_schema()
def _init_schema(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS task_checkpoints (
task_id TEXT PRIMARY KEY,
state TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
checkpoint_data TEXT DEFAULT '{}',
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS state_transitions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
from_state TEXT NOT NULL,
to_state TEXT NOT NULL,
event TEXT NOT NULL,
timestamp TEXT NOT NULL,
metadata TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_transitions_task
ON state_transitions(task_id, timestamp);
""")
self.conn.commit()
def save_transition(self, t: StateTransition) -> None:
self.conn.execute(
"INSERT INTO state_transitions "
"(task_id, from_state, to_state, event, timestamp, metadata) "
"VALUES (?, ?, ?, ?, ?, ?)",
(t.task_id, t.from_state.value, t.to_state.value,
t.event.value, t.timestamp, json.dumps(t.metadata))
)
self.conn.commit()
def save_checkpoint(self, task_id: str, state: TaskState,
retry_count: int, checkpoint_data: dict = None) -> None:
self.conn.execute(
"""INSERT OR REPLACE INTO task_checkpoints
(task_id, state, retry_count, checkpoint_data, updated_at)
VALUES (?, ?, ?, ?, ?)""",
(task_id, state.value, retry_count,
json.dumps(checkpoint_data or {}),
datetime.now(timezone.utc).isoformat())
)
self.conn.commit()
def load_checkpoint(self, task_id: str) -> Optional[dict]:
row = self.conn.execute(
"SELECT state, retry_count, checkpoint_data "
"FROM task_checkpoints WHERE task_id = ?",
(task_id,)
).fetchone()
if row:
return {
"state": row[0],
"retry_count": row[1],
"checkpoint_data": json.loads(row[2])
}
return None
def load_history(self, task_id: str) -> List[StateTransition]:
rows = self.conn.execute(
"SELECT task_id, from_state, to_state, event, timestamp, metadata "
"FROM state_transitions WHERE task_id = ? ORDER BY id",
(task_id,)
).fetchall()
return [
StateTransition(
task_id=r[0],
from_state=TaskState(r[1]),
to_state=TaskState(r[2]),
event=TaskEvent(r[3]),
timestamp=r[4],
metadata=json.loads(r[5])
) for r in rows
]
# ── Core State Machine Class ─────────────────────────
class AgentTaskStateMachine:
"""
A recoverable state machine for agent tasks.
Guarantees:
- Every state transition is validated against the transition table
- Every transition is persisted (auditable and recoverable)
- Task state can be recovered after a restart
"""
def __init__(self, task_id: str, persistence: StatePersistence,
max_retries: int = 3):
self.task_id = task_id
self.persistence = persistence
self._state: TaskState = TaskState.PLANNED
self._history: List[StateTransition] = []
self._retry_count: int = 0
self._max_retries: int = max_retries
self._checkpoint: Dict[str, Any] = {}
self._on_transition_callbacks: List[callable] = []
# ── Properties ───────────────────────────────────
@property
def state(self) -> TaskState:
return self._state
@property
def retry_count(self) -> int:
return self._retry_count
@property
def history(self) -> List[StateTransition]:
return list(self._history)
# ── Observability Hook ───────────────────────────
def on_transition(self, callback: callable) -> None:
"""Register a transition callback (logging, metrics, alerts)."""
self._on_transition_callbacks.append(callback)
def _notify_observers(self, transition: StateTransition) -> None:
for cb in self._on_transition_callbacks:
try:
cb(transition)
except Exception:
pass # Observation failure must not affect the transition
# ── Core Transition Method ───────────────────────
def transition(self, event: TaskEvent,
metadata: Dict[str, Any] = None) -> TaskState:
"""
Attempt a state transition. Transitions not in the table raise
InvalidTransitionError.
"""
key = (self._state, event)
next_state = TRANSITION_TABLE.get(key)
if next_state is None:
raise InvalidTransitionError(
f"Invalid transition: {self._state.value} + {event.value}"
)
# Retry count check
if event == TaskEvent.RETRY:
self._retry_count += 1
if self._retry_count > self._max_retries:
raise InvalidTransitionError(
f"Max retries exceeded ({self._max_retries}) — "
f"use MAX_RETRIES_EXCEEDED event"
)
# Build transition record
transition_record = StateTransition(
task_id=self.task_id,
from_state=self._state,
to_state=next_state,
event=event,
timestamp=datetime.now(timezone.utc).isoformat(),
metadata=metadata or {},
)
# Execute transition
self._history.append(transition_record)
self._state = next_state
# Persist
self.persistence.save_transition(transition_record)
self.persistence.save_checkpoint(
self.task_id, self._state, self._retry_count,
self._checkpoint
)
# Notify observers
self._notify_observers(transition_record)
return self._state
# ── Checkpoint Management ────────────────────────
def set_checkpoint(self, key: str, value: Any) -> None:
"""Store intermediate data for recovery."""
self._checkpoint[key] = value
def get_checkpoint(self, key: str) -> Optional[Any]:
"""Read checkpoint data."""
return self._checkpoint.get(key)
def has_checkpoint(self, key: str) -> bool:
"""Check whether a checkpoint exists."""
return key in self._checkpoint
# ── Idempotent Execution Helper ──────────────────
def execute_idempotent(self, step_name: str,
action: callable) -> Any:
"""
Execute a step idempotently.
action should accept idempotency_key as a parameter and pass
it to the external system. If the step already completed
(checkpoint exists), returns the cached result directly.
"""
idempotency_key = f"{self.task_id}:{step_name}"
# Already done? Return cached result
if self.get_checkpoint(idempotency_key) == "done":
return self.get_checkpoint(f"{idempotency_key}:result")
# Mark as executing
self.set_checkpoint(idempotency_key, "executing")
try:
result = action(idempotency_key)
self.set_checkpoint(idempotency_key, "done")
self.set_checkpoint(f"{idempotency_key}:result", result)
return result
except Exception:
raise
# ── Recovery ─────────────────────────────────────
def recover(self) -> "AgentTaskStateMachine":
"""Restore task state from durable storage."""
saved = self.persistence.load_checkpoint(self.task_id)
if saved:
self._state = TaskState(saved["state"])
self._retry_count = saved.get("retry_count", 0)
self._checkpoint = saved.get("checkpoint_data", {})
self._history = self.persistence.load_history(self.task_id)
# Handle stale states
if self._state == TaskState.RUNNING:
self.transition(TaskEvent.TRANSIENT_ERROR,
metadata={"reason": "recovery_stale_running"})
elif self._state == TaskState.RETRYING:
if self._retry_count < self._max_retries:
self.transition(TaskEvent.RETRY)
else:
self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
elif self._state == TaskState.PAUSED:
last_t = self._history[-1] if self._history else None
if last_t and last_t.metadata.get("timeout_after"):
elapsed = (datetime.now(timezone.utc) -
datetime.fromisoformat(last_t.timestamp))
if elapsed.total_seconds() > last_t.metadata["timeout_after"]:
self.transition(TaskEvent.TIMEOUT,
metadata={"reason": "recovery_approval_timeout"})
elif self._state == TaskState.BLOCKED:
pass # Stay blocked, rely on external alerting
return self
# ── Diagnostic Method ────────────────────────────
def summary(self) -> dict:
"""Return a summary of the task's current state."""
return {
"task_id": self.task_id,
"state": self._state.value,
"retry_count": self._retry_count,
"transition_count": len(self._history),
"checkpoint_keys": list(self._checkpoint.keys()),
"is_terminal": self._state in (TaskState.DONE, TaskState.FAILED),
}
# ── Usage Demos ──────────────────────────────────────
def demo_full_lifecycle():
"""Demonstrate a full task lifecycle including pause, retry, recovery."""
print("=" * 60)
print("AgentTaskStateMachine Lifecycle Demo")
print("=" * 60)
persistence = StatePersistence(":memory:")
task_id = f"task_{uuid.uuid4().hex[:8]}"
sm = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
sm.on_transition(lambda t: print(
f" [observe] {t.from_state.value} → {t.to_state.value} "
f"({t.event.value})"
))
# 1. planned → running
sm.transition(TaskEvent.START)
print(f"[{sm.state.value}] Task started\n")
# 2. running → paused (human approval needed)
sm.transition(TaskEvent.PAUSE_FOR_APPROVAL,
metadata={"step": "refund_approval", "amount": 150.00})
print(f"[{sm.state.value}] Awaiting human approval\n")
# 3. paused → running (approval granted)
sm.transition(TaskEvent.APPROVAL_GRANTED,
metadata={"approver": "[email protected]"})
print(f"[{sm.state.value}] Approval granted, resuming\n")
# 4. running → retrying (transient error)
sm.transition(TaskEvent.TRANSIENT_ERROR,
metadata={"error": "rate_limit", "step": "send_notification"})
print(f"[{sm.state.value}] Transient error, "
f"retry {sm.retry_count}/{sm._max_retries}\n")
# 5. retrying → running (retry)
sm.transition(TaskEvent.RETRY)
print(f"[{sm.state.value}] Retrying...\n")
# 6. running → done
sm.transition(TaskEvent.COMPLETE,
metadata={"result": "refund_processed"})
print(f"[{sm.state.value}] Task completed\n")
# 7. Test invalid transition
print("--- Testing Invalid Transition ---")
try:
sm.transition(TaskEvent.START)
except InvalidTransitionError as e:
print(f"[guard] Invalid transition blocked: {e}\n")
# 8. View transition history
print("--- Transition History ---")
for i, t in enumerate(sm.history, 1):
print(f" {i}. {t.from_state.value} → {t.to_state.value} "
f"({t.event.value}) @ {t.timestamp[:19]}")
print(f"\n--- Summary ---")
for k, v in sm.summary().items():
print(f" {k}: {v}")
def demo_recovery():
"""Demonstrate recovery after a restart."""
print("\n" + "=" * 60)
print("Restart Recovery Demo")
print("=" * 60)
persistence = StatePersistence(":memory:")
task_id = "recovered_task_001"
sm1 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
sm1.transition(TaskEvent.START)
sm1.transition(TaskEvent.COMPLETE)
print(f"Instance 1: Task completed, state = {sm1.state.value}")
sm2 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
sm2.recover()
print(f"Instance 2: Recovered state = {sm2.state.value}")
assert sm2.state == TaskState.DONE, "Recovery failed!"
print("✓ Recovery verified")
print(f" Recovered history entries: {len(sm2.history)}")
def demo_idempotent_execution():
"""Demonstrate idempotent execution."""
print("\n" + "=" * 60)
print("Idempotent Execution Demo")
print("=" * 60)
persistence = StatePersistence(":memory:")
sm = AgentTaskStateMachine(task_id="idempotent_demo", persistence=persistence)
sm.transition(TaskEvent.START)
call_count = [0]
def external_call(idempotency_key: str) -> str:
call_count[0] += 1
print(f" → external call #{call_count[0]} "
f"(idempotency_key={idempotency_key})")
return f"result_{call_count[0]}"
r1 = sm.execute_idempotent("step_refund", external_call)
print(f" 1st call: {r1} (external calls: {call_count[0]})")
r2 = sm.execute_idempotent("step_refund", external_call)
print(f" 2nd call: {r2} (external calls: {call_count[0]})")
assert call_count[0] == 1, "Duplicate execution! Should be called once"
assert r1 == r2, "Results differ!"
print("✓ Idempotency verified")
if __name__ == "__main__":
demo_full_lifecycle()
demo_recovery()
demo_idempotent_execution()
Sample Output
============================================================
AgentTaskStateMachine Lifecycle Demo
============================================================
[observe] planned → running (start)
[running] Task started
[observe] running → paused (pause_for_approval)
[paused] Awaiting human approval
[observe] paused → running (approval_granted)
[running] Approval granted, resuming
[observe] running → retrying (transient_error)
[retrying] Transient error, retry 0/3
[observe] retrying → running (retry)
[running] Retrying...
[observe] running → done (complete)
[done] Task completed
--- Testing Invalid Transition ---
[guard] Invalid transition blocked: Invalid transition: done + start
--- Transition History ---
1. planned → running (start)
2. running → paused (pause_for_approval)
3. paused → running (approval_granted)
4. running → retrying (transient_error)
5. retrying → running (retry)
6. running → done (complete)
--- Summary ---
task_id: task_a1b2c3d4
state: done
retry_count: 1
transition_count: 6
checkpoint_keys: []
is_terminal: True
============================================================
Restart Recovery Demo
============================================================
Instance 1: Task completed, state = done
Instance 2: Recovered state = done
✓ Recovery verified
Recovered history entries: 2
============================================================
Idempotent Execution Demo
============================================================
→ external call #1 (idempotency_key=idempotent_demo:step_refund)
1st call: result_1 (external calls: 1)
2nd call: result_1 (external calls: 1)
✓ Idempotency verified
Production Hardening Checklist
To use this skeleton in production, the following adaptations are needed:
- PostgreSQL instead of SQLite: Multi-instance agents need concurrent write support. Use connection pooling (e.g.,
psycopg2pool), transaction management, and primary-replica replication. - Database migrations: Use Alembic or similar to manage schema versions. Task checkpoint schemas will evolve — migration support is essential.
- Connection pooling and transaction safety: Every
transition()call must be atomic — if persistence fails, the transition must roll back. The current SQLite implementation uses simple commits; production needs proper transaction management. - Concurrency control: Multiple processes operating on the same task's state machine need optimistic locking or distributed locks. The simplest approach: use
updated_atontask_checkpointsfor optimistic concurrency control (compare-and-swap). - Horizontal scaling: When state storage becomes the bottleneck, shard by
task_id. Each shard gets an independent PostgreSQL instance. Transition history can be written asynchronously; checkpoint data must be written synchronously. - Security hardening: State machine operations need authentication and authorization — not everyone should be able to change task states. Integrate with your IAM system.
- Configurable retry policies: Extract exponential backoff parameters (base, max_wait, jitter) into configuration. Different task types may need different retry strategies.
- Configurable recovery policies: Not all stale
runningstates should transition toretrying— some tasks should remain inrunningwhile awaiting a heartbeat check. Recovery policies should be configurable per task type.
Limitations — An Honest Accounting
- Guarantees that within the agent, state transitions are deterministic and recoverable.
- Prevents the agent from initiating duplicate calls for the same operation (via checkpoints and idempotency keys).
- Provides a complete audit trail for every task.
- Make LLM output deterministic — the LLM within the
runningstate can still produce different reasoning results. - Guarantee exactly-once external side effects — if the external API does not support idempotency keys, duplicate calls can still produce duplicate effects.
- Provide distributed transactions — the state machine's internal state and the external system's state are not atomically consistent.
- Replace orchestration engines like Temporal or LangGraph — they solve a different problem (external workflow orchestration vs. agent-internal task lifecycle).
FAQ
How is this different from Temporal or AWS Step Functions? Do I still need an orchestrator?
This state machine is inside the agent — it is the agent's own understanding of where it is in a task. Temporal and Step Functions are external orchestrators that tell workers what to do. They are complementary: the agent's internal state machine ensures it doesn't skip steps or duplicate work; an external orchestrator (if you use one) ensures the agent gets invoked at the right time. This article demonstrates the pattern of building a state machine into the agent; whether you add an outer orchestration layer depends on your needs.
My agent already uses LangGraph's checkpointing. Why add an explicit state machine?
LangGraph checkpoints the graph traversal state (which node, which edge condition), not the task lifecycle. If your task needs to distinguish "paused for approval" from "blocked on dependency" from "retrying after error," you need explicit lifecycle state. The good news: LangGraph's checkpoint is an excellent persistence backend — you can build this article's 7-state machine on top of LangGraph's checkpoint storage.
How does the state machine prevent duplicate side effects?
Through the dual protection of idempotency keys + checkpoints. Before any side-effecting operation: (1) check whether the checkpoint already contains that operation's idempotency key with status "done" — if so, return the cached result without executing; (2) if not, first write the idempotency key (status "executing"), then execute the operation, then update to "done" on success. If the process crashes while in "executing" status, recovery sees "executing" and knows that attempt's outcome is uncertain — it should query the external system rather than blindly retrying. See Section 5 for details.
What happens when the LLM context is gone after a restart?
The state machine does not recover the full conversation history — it only recovers task state and key checkpoint data. After a restart: (1) read from the state machine "which step the task is on, what the previous results were"; (2) use this information to construct a new context summary (not the full conversation history); (3) inject this summary into the LLM's fresh context window so the LLM can continue reasoning from that point. The conversation history is lost, but task progress is not. On how to rebuild working context from checkpoints, see Agent Context Window Management, specifically the cross-window state management section.
How does the state machine handle LLM nondeterminism?
The state machine does not eliminate LLM nondeterminism — it contains it. The LLM operates within the running state and can produce varied outputs, but it cannot change the task state. State transitions happen outside the LLM in deterministic code, based on structured events (tool results, approval callbacks, error signals), not LLM-generated text. The state machine provides a deterministic framework around probabilistic LLM execution — not a deterministic LLM.
Is this overengineering for a simple agent?
If your agent does one-shot question answering — one call, one answer — then yes, the state machine is overengineering. The complete Python skeleton in this article is roughly 250 lines of core code (excluding comments and demo code). This complexity buys you: queryable task state at any moment, non-bypassable approval gates, retry deduplication, and restart recovery. If your agent executes multi-step workflows with side effects, involves human approval, or runs longer than the LLM's context window, then the debugging cost without a state machine (investigating "why was the refund issued twice?", fixing "why was approval skipped?", recovering "what do I do after restart?") far exceeds 250 lines of code. The state machine is the minimum viable reliability layer for long-running agents — not an optional luxury.
Next Steps
This article is the deterministic task lifecycle chapter of the Agent Memory and Context Engineering series. Here is where to go next, ordered by dependency:
- Agent Context Window Management — The state machine answers "where is the task?"; context management answers "what does the LLM remember?" Together they are the two legs of agent reliability — the state machine preserves progress when context disappears; context management maintains what the LLM needs within the window. (Read first if you haven't.)
- Agent Memory System Design — The L0-L3 four-layer memory architecture. Task state and checkpoints are key data in the L1/L2 layers — understanding what each layer stores helps you decide where the state machine's checkpoints belong.
- Agent Human Approval Workflow — This article provides the underlying
pausedstate machine; that article provides the upper-layer approval UI/UX design and interaction patterns. - Agent Observability — The state transition events from this article are a core observability data source. Pipe the transition stream into Prometheus/Grafana to build task-state dashboards and alerts.
- Agent Audit Log Design — The transition history is an audit log. That article discusses immutability, retention policies, and compliance requirements.
- Agent Message Schema Design — State transition event schemas should follow the same type-safety, version-evolution, and validation principles as agent message schemas.