Agent State Machine Design: Turning Uncontrolled Conversations into Recoverable Workflows

⚡ 30-Second Takeaway

  • Core Problem: Conversation-driven agents skip steps, duplicate side effects, and lose all progress after restarts because conversation history is not task state. LLMs are probabilistic — without a deterministic wrapper, execution drifts unpredictably. You need an explicit task lifecycle state machine wrapping every agent execution.
  • The Solution: A 7-state explicit task lifecycle — planned, running, paused, blocked, retrying, done, failed. Each state has defined entry conditions, allowed events, and legal transition targets. Invalid transitions are rejected, not best-effort.
  • Key Implementation: AgentTaskStateMachine core class (~250 lines of Python) — TaskState/TaskEvent enums, TRANSITION_TABLE, InvalidTransitionError guard, SQLite persistence, checkpoint/idempotency-key support, recovery strategy. Includes full lifecycle demo and restart recovery verification.
  • What You Will Walk Away With: A deterministic task lifecycle wrapper for your agent — at every moment, the task knows what it is, what state it is in, what event can move it forward, and how to recover after a restart. No more "the agent just disappeared" production nightmares.

1. Why Production Agents Need Explicit State Machines

It is 2 AM. An automated refund-approval agent is processing a refund. The workflow is simple: validate the order, check the refund policy, request manager approval, call the payment gateway to issue the refund, send a notification. At step 3, after sending the approval request, the payment gateway times out. The agent's conversation history contains the text "approval granted, proceed" — so it retries, skips the approval check, reads "approved" from a step 3 result cache, and executes step 4: the refund. Three seconds later, the manager rejects the refund. But the money has already been sent.

That same afternoon, the same agent is processing another refund when the server is killed by the OOM Killer. The agent process restarts. The conversation history is gone — the LLM context window is empty. The agent starts from scratch: validate order, check policy, request approval... but it has no idea this refund was already processed. It issues a second refund.

These are the three fundamental failure modes of conversation-driven agents:

  1. Context Drift → Step Skip: The agent's "state" is implicit in conversation history. When the history grows long, gets compressed, truncated, or diluted by attention decay, the agent may skip critical steps. "Approval granted" and "approval requested, awaiting response" can look frustratingly similar in conversational text — and the LLM cannot reliably distinguish them.
  2. Retry → Duplicate Side Effects: The agent encounters a network timeout and retries, not knowing the first call actually succeeded (the response was lost in transit). Without explicit state records, there is no basis for deduplication.
  3. Restart → Total Progress Loss: After a restart, the conversation history is not in durable storage. The agent starts from zero, unaware of which steps were already completed.
📌 Key Insight: LLMs are probabilistic — the same input can produce different outputs; the same conversation history can be interpreted as a different "current state." You need a deterministic shell around LLM execution. That shell is the state machine — it does not rely on the LLM to determine "what should I do now?" Instead, it applies explicit transition rules to decide. The LLM reasons freely within the running state, but it cannot change the task state. State transitions happen outside the LLM, driven by structured events (tool results, approval callbacks, error signals) — not LLM-generated text.

Conversation-Driven vs. State-Machine-Driven Agents

You do not need to read framework source code to see the difference. Here is the contrast:


  Conversation-Driven Agent (state implicit in chat history):
  ┌─────────────────────────────────────────────┐
  │ User: Process refund #12345                 │
  │ Agent: Sure, let me validate the order...   │
  │ Agent: Order valid, amount $150.00          │
  │ Agent: Manager approval required, sent      │
  │ Agent: [waiting... chat says "sent request"]│
  │ --- History truncated/compressed/misread --- │
  │ Agent: Approved! Processing refund... WRONG!│
  └─────────────────────────────────────────────┘

  State-Machine-Driven Agent (state explicit and validated):
  ┌─────────────────────────────────────────────┐
  │ task.state = PLANNED                        │
  │   → START → task.state = RUNNING            │
  │   → PAUSE_FOR_APPROVAL → task.state = PAUSED│
  │   → [wait for approval callback...]         │
  │   → callback arrives → validate: PAUSED +    │
  │     APPROVAL_GRANTED → legal → RUNNING       │
  │   → COMPLETE → task.state = DONE            │
  │                                              │
  │ At every moment: task.state tells you exactly│
  │ where the task is. Context lost? Recover     │
  │ task.state from durable storage.             │
  │ Process restarted? task.state is in SQLite.  │
  └─────────────────────────────────────────────┘
  

The core difference: a conversation-driven agent's "state" is a semantic position somewhere in 200 chat messages. A state-machine-driven agent's state is a single deterministic value in a database. The former relies on the LLM to infer; the latter is an immutable, verifiable fact.

On why context windows are inherently unreliable, see Agent Context Window Management — context decay is a physical law, not a bug. The state machine is precisely the mechanism that survives context loss and preserves task progress.

2. From Conversation Flow to Task Lifecycle: States, Events, and Transitions

Before diving into specific states, we need precise definitions for the three concepts that constitute the state machine's entire semantics:

State
The determinate position of a task at a given moment. State answers: "Where is this task right now?" A task has exactly one current state at any moment. State is explicit, durable, and queryable.
Event
An external or internal signal that triggers a state change. Event answers: "What happened that requires the task to react?" Events can be tool call results, human approval callbacks, timeout signals, error codes — but not LLM text output. Events are structured, named, and auditable.
Transition
A directed edge from state A to state B, triggered by an event. Transition answers: "Given the current state and the event that occurred, which state should the task go to?" Transition rules are encoded in a transition table and enforced on every attempt. Disallowed transitions raise errors — they do not silently degrade.

The Transition Table: The State Machine's Core Data Structure

The entire behavior of the state machine can be compressed into a single transition table — a mapping from (current_state, event) to target_state. This table serves as both documentation (all legal paths in one view) and execution rules (looked up on every transition attempt).


  TRANSITION_TABLE = {
      (PLANNED,  START):                 RUNNING,
      (RUNNING,  PAUSE_FOR_APPROVAL):    PAUSED,
      (RUNNING,  BLOCK_ON_DEPENDENCY):   BLOCKED,
      (RUNNING,  COMPLETE):              DONE,
      (RUNNING,  FATAL_ERROR):           FAILED,
      (RUNNING,  TRANSIENT_ERROR):       RETRYING,
      (PAUSED,   APPROVAL_GRANTED):      RUNNING,
      (PAUSED,   APPROVAL_DENIED):       FAILED,
      (PAUSED,   TIMEOUT):               FAILED,
      (BLOCKED,  DEPENDENCY_RESOLVED):   RUNNING,
      (BLOCKED,  FATAL_ERROR):           FAILED,
      (RETRYING, RETRY):                 RUNNING,
      (RETRYING, MAX_RETRIES_EXCEEDED):  FAILED,
      (RETRYING, FATAL_ERROR):           FAILED,
      # DONE and FAILED are terminal: no outgoing edges
  }
  

Note what is absent from this table:

These "missing transitions" are not bugs — they are design constraints. The transition table enforces flow control through "what is not listed is illegal" — cleaner and harder to bypass than scattered if-else checks.

State Diagram: The Seven States at a Glance


                      ┌─────────┐
                      │ PLANNED │
                      └────┬────┘
                           │ START
                           ▼
            ┌──────────────┴──────────────────┐
            │            RUNNING              │◄──────────────────────┐
            └──┬──────┬──────┬──────┬────────┘                       │
               │      │      │      │                                 │
    PAUSE_FOR  │      │      │      │ COMPLETE                        │
    _APPROVAL  │      │      │      │                                 │
               ▼      ▼      ▼      ▼                                 │
          ┌────────┐ ┌───────┐┌─────────┐┌──────┐                     │
          │ PAUSED │ │BLOCKED││RETRYING ││ DONE │                     │
          └───┬──┬─┘ └───┬───┘└────┬────┘└──────┘                     │
              │  │        │         │                                  │
     APPROVAL │  │TIMEOUT │DEPENDENCY  │ RETRY                        │
     _GRANTED │  │        │_RESOLVED   └──────────────────────────────┘
              │  │        │
              │  ▼        ▼
              │ ┌──────────┐
              │ │  FAILED  │◄── MAX_RETRIES_EXCEEDED, FATAL_ERROR,
              │ └──────────┘    APPROVAL_DENIED
              │
              └──────────────────────────────────────────────────────► RUNNING
  

This diagram shows all 14 legal transitions. Note the key structural features:

Invalid Transitions Are Errors, Not Degraded Behavior

One of the most important design decisions: any transition attempt not in the transition table must raise an exception. It must not be silently ignored or degraded.


  def transition(self, event: TaskEvent, metadata=None) -> TaskState:
      key = (self._state, event)
      next_state = TRANSITION_TABLE.get(key)
      if next_state is None:
          raise InvalidTransitionError(
              f"Invalid transition: {self._state.value} + {event.value}"
          )
      # ... execute transition ...
  

Why an exception and not a warn log? Because the code calling transition() believes the state has changed. If the call is silently ignored, the caller continues executing based on a false state assumption. The exception forces the caller to confront the situation: either fix a bug (why did this illegal event appear?) or handle a boundary condition (e.g., event reordering due to concurrency races).

On why structured event typing and versioning matters, see Agent Message Schema Design — state transition events and agent messages share the same principles of type safety, version evolution, and validation.

3. Core State Design: planned, running, paused, blocked, retrying, done, failed

Seven states cover the complete lifecycle of an agent task from creation to termination. Each state has clear semantics, entry conditions, allowed operations, and detection methods.

planned

Meaning: Task created, preconditions not yet validated. This is the initial state. In this state, the agent performs no side effects — it is merely preparing. It can validate parameters, allocate resources, and check whether dependent services are reachable.

Allowed events: Only START. Any other event (including COMPLETE, FATAL_ERROR) is illegal in planned.

Why this state exists: It distinguishes "task created" from "task started executing." In distributed systems, a task can be created but never scheduled (scheduler down, queue backlogged). planned lets you monitor "how many tasks are queued but not yet started" — a key metric for capacity planning.

running

Meaning: The agent is actively executing task steps. This is the only state in which the LLM can freely reason. Within this state, the agent calls tools, analyzes results, and generates intermediate outputs — but it cannot change its own task state. State transitions are triggered by structured events (specific tool results, external approval callbacks, error signals), not by LLM text output.

Allowed events: PAUSE_FOR_APPROVAL, BLOCK_ON_DEPENDENCY, COMPLETE, FATAL_ERROR, TRANSIENT_ERROR. With five outgoing edges, running has the most transitions of any state.

Critical constraint: The LLM can produce any output inside running — but it cannot say "the task is done." Only the COMPLETE event (generated by tool calls or evaluation logic) triggers the transition to done. This is the core manifestation of the state machine's "deterministic shell."

paused

Meaning: The agent is waiting for external input — typically human approval, human feedback, or a safety review. This is voluntary waiting. The agent can continue running (monitoring timeouts, sending reminders) but cannot advance the task itself.

Allowed events: APPROVAL_GRANTED (back to running), APPROVAL_DENIED (to failed), TIMEOUT (to failed).

Key distinction from blocked: paused is active waiting — "I need a human's decision." blocked is passive obstruction — "the service I need is unavailable." The former is a normal part of the workflow design; the latter is an anomaly.

blocked

Meaning: The agent cannot proceed because it encountered an unresolvable dependency — a dependent API returns 503, a required file does not exist, permissions are insufficient to access a resource. This is involuntary obstruction.

Allowed events: DEPENDENCY_RESOLVED (back to running), FATAL_ERROR (to failed).

Key distinction from paused: blocked does not auto-timeout into failed — dependency recovery time is unpredictable. But external monitoring should alert: "this task has been blocked for 4 hours, human intervention needed." paused, by contrast, has an explicit timeout policy — approvals cannot wait indefinitely.

retrying

Meaning: The agent encountered a transient error and is re-attempting. This is the buffer state between running and failed.

Allowed events: RETRY (back to running), MAX_RETRIES_EXCEEDED (to failed), FATAL_ERROR (to failed).

Key distinction from running: retrying means the previous attempt failed. This distinction matters because: (1) retries require idempotency guarantees — already-succeeded steps must not be re-executed; (2) retries have an upper bound — infinite retry is an infinite loop; (3) monitoring must distinguish "normal operation" from "currently retrying" — these have entirely different operational semantics.

done — Terminal

Meaning: Task completed successfully. All steps executed, all side effects committed, all notifications sent.

Allowed events: None. Any event in done is illegal. Terminal states are irreversible.

failed — Terminal

Meaning: Task cannot be completed. May be due to rejected approval, exhausted retries, fatal error, permanent dependency failure, or pause timeout.

Allowed events: None. Like done, this is an irreversible terminal state.

But note: failed does not mean "data lost" or "untraceable." The failed task's state history, checkpoint data, and error information all remain in durable storage. An operator can extract completed-step results from a failed task, manually handle the remaining work, or even create a new task instance to continue from the last checkpoint — but that new instance is a new planned task, not a state transition of the old one.

State Accounting: A Task's Lifecycle by the Numbers

Another way to understand the seven states is through a typical task's "state residency":


  State      Residency            Description
  ─────────────────────────────────────────────────────
  planned    milliseconds~seconds  Validating preconditions
  running    seconds~minutes       Agent executing task steps
  paused     minutes~hours         Awaiting human approval
  blocked    minutes~hours         Awaiting dependency recovery
  retrying   seconds~minutes       Backoff retry window
  done       permanent             Terminal
  failed     permanent             Terminal
  

Note that running may account for far less time than you expect — in production, agents spend most of their time waiting (paused/blocked), not computing (running). This means the state machine's primary value is not controlling behavior inside running, but managing the semantics and recovery of waiting periods.

4. Pause and Resume: Encoding Human Approval, Feedback, and Safety Gates

Most agent frameworks treat human approval as a special tool call or a chat message. "The agent sends a message saying 'please approve' and then continues executing." This is dangerous — the agent may skip waiting based on conversation context that "looks like" approval before the human actually responds.

The state machine elevates human approval to a first-class state: paused. Once an agent enters paused, it physically cannot enter running without an explicit APPROVAL_GRANTED or APPROVAL_DENIED event. The LLM cannot bypass this by generating text that "looks like approval" — because state transitions do not pass through the LLM.

Three Paths Through the Approval Gate


  Path A: Approved (normal flow)
  RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_GRANTED → RUNNING

  Path B: Denied (terminal)
  RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_DENIED → FAILED

  Path C: Timeout (protective termination)
  RUNNING → PAUSE_FOR_APPROVAL → PAUSED → TIMEOUT → FAILED
  

Each path is non-bypassable. The agent cannot jump from paused directly to done; it cannot "keep working" while in paused (in paused, you cannot execute task steps — you can only wait for events).

Timeout Handling: Approval Cannot Wait Forever

The paused state has an explicit timeout policy. A typical configuration:


  Approval Timeout Policy:
    Initial wait: 30 minutes
    Reminder: at 15 minutes, send reminder (Slack/email/PagerDuty)
    Timeout action: mark FAILED, reason: "approval_timeout"
    Grace period: 5 minutes post-timeout — if approval arrives,
                  operator can manually recover from FAILED
                  (create a new task instance from checkpoint)
  

Note that this does not auto-retry — approval is a human decision, not a network timeout. A person's decision does not automatically become "approved" because more time elapsed. Terminating on timeout is the safe default.

Safety Gates: More Than Just Approval

The paused state is not limited to human approval. Any scenario requiring "pause and verify before executing side effects" can use it:

Integration Pattern with External Approval Systems

The state machine does not bind to a specific approval UI. It defines only the event interface. Here is the integration pattern:


  # Webhook receiving approval callbacks
  @app.route("/approval-callback", methods=["POST"])
  def handle_approval():
      data = request.json
      task_id = data["task_id"]
      decision = data["decision"]  # "approved" or "denied"

      sm = load_state_machine(task_id)  # recover from durable storage
      event = APPROVAL_GRANTED if decision == "approved" else APPROVAL_DENIED
      sm.transition(event, metadata={
          "approver": data["approver"],
          "comment": data.get("comment", ""),
          "timestamp": data["timestamp"]
      })
  

The approval source can be a Slack button, a Jira status change, a custom web panel, or an enterprise messaging system — as long as it can send an HTTP request with a task_id and decision, it can drive the state machine.

For approval workflow UI/UX design patterns and more complete integration schemes, see Agent Human Approval Workflow — this article provides the underlying state machine; that article provides the upper-layer interaction design.

Repeat: paused is not failure. Many teams alert when an agent is in paused — this is a misunderstanding of state machine semantics. paused is a normal, designed, expected waiting state. Your monitoring dashboard should display paused and blocked separately, not lump them together as "not running."

5. Failure and Retry: Preventing Duplicate Execution, Step Skips, and Infinite Loops

Retry is the most error-prone operation in distributed systems — not because retry itself is complex, but because the state assumptions during retry are frequently wrong. The state machine's retry design revolves around three guarantees:

  1. No Duplicate Effects: A retry must not re-execute steps that already succeeded.
  2. No Step Skips: A retry must not leap past incomplete steps to later ones.
  3. Bounded Retries: Retries have an upper count limit and a backoff strategy — no infinite loops.

The Retry State Machine: A Complete Retry Lifecycle


  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=0)
  RETRYING → [backoff wait 2^0 = 1s] → RETRY → RUNNING (retry_count=1)
  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=1)
  RETRYING → [backoff wait 2^1 = 2s] → RETRY → RUNNING (retry_count=2)
  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=2)
  RETRYING → [backoff wait 2^2 = 4s] → RETRY → RUNNING (retry_count=3)
  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=3)
  RETRYING → MAX_RETRIES_EXCEEDED → FAILED
  

Note the key design details:

Retry Backoff Strategy


  import time

  def retry_with_backoff(sm: AgentTaskStateMachine, max_retries=3):
      """Execute a retry loop with exponential backoff."""
      while sm.state == TaskState.RETRYING:
          backoff = 2 ** sm.retry_count  # exponential: 2, 4, 8...
          print(f"[retry] backing off {backoff}s (attempt {sm.retry_count})")
          time.sleep(backoff)

          try:
              sm.transition(TaskEvent.RETRY)
              return  # back to RUNNING, caller resumes execution
          except InvalidTransitionError:
              # exceeded max retries, use MAX_RETRIES_EXCEEDED
              sm.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
              raise
  

Idempotency: The State Machine Prevents Duplicate Triggers, Not External Duplicates

This is one of the article's most important honest caveats:

📌 The state machine guarantees the agent will not initiate two calls for the same step. It does not guarantee the external system will not process two calls — if the first call's response was lost in the network, and the second call happens to be treated as a new request by the external system, duplication still occurs.

To achieve stronger idempotency, two layers must cooperate:

  1. Agent side (state machine layer): Write an idempotency key checkpoint before executing any side-effecting action. Check whether the key already exists before execution.
  2. External system side: The external API must support idempotency keys (e.g., Stripe's Idempotency-Key header) and deduplicate on the server side.

  # Agent-side idempotency key pattern
  def execute_with_idempotency(sm, step_name, action):
      idempotency_key = f"{sm.task_id}:{step_name}"

      # Already completed? Return cached result
      if sm.get_checkpoint(idempotency_key) == "done":
          print(f"[idempotency] step '{step_name}' already done, skipping")
          return sm.get_checkpoint(f"{idempotency_key}:result")

      # Mark as executing (prevents concurrent duplicates)
      sm.set_checkpoint(idempotency_key, "executing")

      try:
          result = action(idempotency_key)  # pass key to external system
          sm.set_checkpoint(idempotency_key, "done")
          sm.set_checkpoint(f"{idempotency_key}:result", result)
          return result
      except Exception:
          # Execution failed — do not mark done
          # State remains "executing"; on recovery, check external system
          raise
  

The key pattern: Mark executing first, then execute, then mark done on success. If the process crashes during execution, recovery sees executing and knows that attempt's outcome is uncertain — it should query the external system for confirmation rather than blindly retrying or blindly skipping.

Retryable vs. Non-Retryable Errors

Error TypeExampleRetry?Handling
Transient network errorConnection timeout, DNS resolution failure✓ YesTRANSIENT_ERROR → RETRYING
Service temporarily unavailable503 Service Unavailable, 429 Rate Limited✓ YesTRANSIENT_ERROR → RETRYING (with backoff)
Response lost in transitRequest succeeded; response lost in network✓ CarefullyQuery external system via idempotency key before deciding
Parameter validation failure422 Unprocessable Entity✗ NoFATAL_ERROR → FAILED (retry won't change the outcome)
Insufficient permissions403 Forbidden✗ NoFATAL_ERROR → FAILED
Data corruptionRequired field empty, malformed data✗ NoFATAL_ERROR → FAILED

The core decision criterion: If you retry with all inputs unchanged, will the outcome change? If the answer is "no" (the same request always returns 422), do not retry. If the answer is "maybe" (network recovers, service recovers), retry is appropriate.

6. State Storage and Recovery: Continuing After Restarts, Disconnections, and Context Resets

This is the state machine's most fundamental value — task state is durable. The agent process can be killed, the server can restart, the LLM context can be wiped — but the task state remains.

What Must Be Persisted

A complete state storage scheme must persist the following data:


  ┌─────────────────────────────────────────────────┐
  │              task_checkpoints table               │
  ├──────────────┬──────────────────────────────────┤
  │ task_id      │ Unique task identifier             │
  │ state        │ Current state (planned/running/...)│
  │ retry_count  │ Current retry count                │
  │ checkpoint   │ JSON: {step results, data, keys}   │
  │ updated_at   │ Last update timestamp              │
  └──────────────┴──────────────────────────────────┘

  ┌─────────────────────────────────────────────────┐
  │            state_transitions table               │
  ├──────────────┬──────────────────────────────────┤
  │ id           │ Auto-increment primary key         │
  │ task_id      │ Unique task identifier             │
  │ from_state   │ Pre-transition state               │
  │ to_state     │ Post-transition state              │
  │ event        │ Triggering event                   │
  │ timestamp    │ ISO 8601 timestamp                 │
  │ metadata     │ JSON: {error info, approver, ...}  │
  └──────────────┴──────────────────────────────────┘
  

Note that the state transitions table is append-only — each row is one transition event, never updated or deleted. This means you have a complete audit trail: at any moment, you can trace a task's every step from planned to done/failed.

Storage Backend Selection

BackendUse CaseCaveats
SQLiteSingle-machine agent, demos, local toolsSimple, zero dependencies, transaction-safe. Not suitable for concurrent writes — multiple agent instances sharing one SQLite file cause lock contention. Use WAL mode in production.
PostgreSQLProduction agents, multi-instance deploymentSupports concurrency, connection pooling, primary-replica replication. Task state storage is critical infrastructure — if the state database goes down, all agents fail to transition (fail closed, safe).
Redis + AOFLow-latency scenariosFast in memory, but durability is weaker than PostgreSQL. If using Redis, enable AOF persistence (appendonly yes) and accept that the last few seconds of transition records may be lost on restart.
File-based JSONPrototypes, demos, single-file toolsSimplest but not production-suitable — no transactions, no concurrency control, no recovery guarantees.

Recovery Strategy: Handling Four Stale States

After an agent restart, load task state from durable storage, then execute the recovery strategy based on the current state:


  def recover(self):
      saved = self.persistence.load_checkpoint(self.task_id)
      if saved:
          self._state = TaskState(saved["state"])
          self._retry_count = saved.get("retry_count", 0)
          self._checkpoint = saved.get("checkpoint_data", {})
      self._history = self.persistence.load_history(self.task_id)

      # Handle stale states
      if self._state == TaskState.RUNNING:
          # Process crashed while RUNNING → mark for retry
          self.transition(TaskEvent.TRANSIENT_ERROR,
              metadata={"reason": "recovery_stale_running"})

      elif self._state == TaskState.RETRYING:
          # Process crashed while RETRYING → check count, then retry
          if self._retry_count < self._max_retries:
              self.transition(TaskEvent.RETRY)
          else:
              self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)

      elif self._state == TaskState.PAUSED:
          # Check if approval timed out
          paused_at = self._get_last_transition_time()
          if time_since(paused_at) > APPROVAL_TIMEOUT:
              self.transition(TaskEvent.TIMEOUT,
                  metadata={"reason": "recovery_approval_timeout"})

      elif self._state == TaskState.BLOCKED:
          # Check if dependency recovered (re-probe dependency service)
          if self._check_dependency():
              self.transition(TaskEvent.DEPENDENCY_RESOLVED)
          # Still blocked → remain BLOCKED, await external monitoring alert

      return self
  

Recovery defaults should bias toward safety:

Task-State Recovery vs. LLM Context Recovery: Two Different Things

📌 Critical Distinction: The state machine recovers task-level state — which step the task is on, what the checkpoint data is, which steps are complete. It does not recover the LLM's conversation history. After a restart, the LLM faces an empty context window. The agent reads task state and checkpoint data from the state machine, constructs a new context summary ("you are executing task X, steps 1-3 are complete, step 3's result was Y, now execute step 4"), and then asks the LLM to continue reasoning from that point. The conversation history is lost, but task progress is not.

On context window recovery strategies, see Agent Context Window Management — the cross-window state management section covers how to rebuild LLM working context from checkpoints after a context reset. On the memory system's persistence architecture (how each L0-L3 layer stores data), see Agent Memory System Design — task checkpoints and semantic memory are different storage layers.

7. Observable State Machines: Logs, Metrics, Audit Trails, and Alerts

The state machine is not merely an execution model — it is itself an observability primitive. Every state transition is an immutable event record, naturally suited for building audit trails, metric dashboards, and alerting rules.

Transition Events: The Atomic Unit of Observation

Each state transition produces a structured event:


  {
      "task_id": "task_a1b2c3d4",
      "from_state": "running",
      "to_state": "paused",
      "event": "pause_for_approval",
      "timestamp": "2026-06-05T14:22:31.123456+00:00",
      "metadata": {
          "step": "refund_approval",
          "amount": 150.00,
          "requested_by": "agent_refund_processor"
      }
  }
  

This event records what happened, why, and when. All transition events sorted by time form the task's complete audit trail — no separate "audit log system" is needed; the state transition events are the audit log.

Observability Hook: Triggered on Every Transition


  # on_transition hook: called after every state transition
  def on_transition(transition: StateTransition):
      # 1. Structured logging
      logger.info(json.dumps({
          "event": "state_transition",
          "task_id": transition.task_id,
          "from": transition.from_state.value,
          "to": transition.to_state.value,
          "trigger": transition.event.value,
          "timestamp": transition.timestamp,
          "metadata": transition.metadata
      }))

      # 2. Metric instrumentation
      metrics.increment(f"transition.{transition.to_state.value}")
      metrics.gauge(f"task.{transition.task_id}.state",
                    transition.to_state.value)

      # 3. Alert checking
      if transition.to_state == TaskState.RETRYING:
          retry_count = transition.metadata.get("retry_count", 0)
          if retry_count >= 3:
              alerts.send("high_retry_count",
                  f"Task {transition.task_id} retried {retry_count} times")
  

On structured event schema typing and versioning, see Agent Message Schema Design — state transition events should have typed, versioned, validated schemas, just like agent messages.

Core Metrics

These are the key metrics to compute from the state transition stream:

MetricComputationPurpose
time_in_statenow - timestamp of entry into current stateSLA monitoring: how long has the task been running? How long has it been paused?
transition_countsAggregate count by event typeAnomaly detection: retry count spiking? Approval denial rate anomalous?
state_distributionTask count per stateCapacity planning: how many tasks queued (planned)? How many waiting (paused/blocked)?
retry_rateretrying transitions / total transitionsHealth assessment: rising retry rate signals unstable dependencies
mean_time_to_recoveryAvg time from paused/blocked/retrying back to runningRecovery efficiency: approval response time, dependency recovery time, retry backoff window
invalid_transition_attemptsCount of InvalidTransitionError raisesBug detection: event reordering, concurrency races, caller logic errors

Alerting Rules

Alerts based on state machine metrics should detect the following conditions:


  Alert Rules:
    1. task_in_running_too_long:
       condition: time_in_state(RUNNING) > 30min
       action: page oncall with task_id and last checkpoint
       meaning: Agent may be stuck (infinite loop, unresponsive tool call)

    2. task_retry_flapping:
       condition: retry_count >= 3 AND oscillating RETRYING ↔ RUNNING
       action: escalate alert, human intervention
       meaning: Backoff too short, or the dependency issue is not transient

    3. task_paused_abandoned:
       condition: time_in_state(PAUSED) > 4h
       action: notify approver + oncall
       meaning: Approval may be forgotten — needs reminder or escalation

    4. task_blocked_prolonged:
       condition: time_in_state(BLOCKED) > 2h
       action: notify oncall
       meaning: Dependency service down for extended period, needs human intervention

    5. too_many_failed_tasks:
       condition: failed_count / total_tasks > 0.3 (sliding window)
       action: critical alert
       meaning: Systemic issue — possible dependency failure or configuration error

    6. invalid_transition_spike:
       condition: invalid_transition_attempts > 10/min
       action: notify oncall
       meaning: Caller code bug or concurrency race causing event reordering
  

For a more complete agent observability system (metric collection, dashboard construction, alerting pipeline), see Agent Observability — this article focuses on the observation signals produced by state transitions; that article covers the full agent monitoring infrastructure. On audit log immutability, retention policies, and compliance requirements, see Agent Audit Log Design — the state transition history is one implementation form of an audit log.

Log Level Guidance


  Log Levels:
    INFO:   Every successful state transition
            Example: "Task abc123: RUNNING → PAUSED (pause_for_approval)"

    WARN:   Illegal transition attempt (blocked by state machine)
            Example: "Task abc123: Rejected transition PAUSED + COMPLETE"
            Important — this is a signal of a bug or attack

    ERROR:  State persistence failure
            Example: "Task abc123: Failed to write transition record to database"
            The transition should fail (fail closed), task stays in current state

    DEBUG:  State machine internals (transition table lookup, checkpoint read/write)
            Enable only in development
  

8. Complete Example: A Recoverable AgentTaskStateMachine Skeleton in Python

Below is a runnable Python reference implementation demonstrating every concept discussed in this article — state enums, transition table, invalid transition guard, SQLite persistence, checkpointing, recovery strategy, and lifecycle demos. This is not a production-grade library (it lacks connection pooling, migrations, horizontal scaling, etc.) but rather a production-pattern skeleton you can adapt to your own agent.

Complete Code


  """
  AgentTaskStateMachine — A Recoverable Agent Task State Machine
  ==============================================================

  This is a production-pattern skeleton, not a deployable library.
  Missing: connection pooling, database migrations, horizontal scaling,
  security hardening.
  Provides: complete 7-state lifecycle, transition validation,
  persistence, recovery.

  Requirements: Python 3.9+ (stdlib + sqlite3 only)
  """

  from enum import Enum
  from dataclasses import dataclass, field
  from datetime import datetime, timezone
  from typing import Optional, Dict, List, Any
  import json
  import sqlite3
  import time
  import uuid


  # ── State Enum ───────────────────────────────────────
  class TaskState(str, Enum):
      """Explicit lifecycle states for an agent task."""
      PLANNED = "planned"      # Task created, preconditions not yet validated
      RUNNING = "running"      # Agent actively executing steps
      PAUSED = "paused"        # Awaiting human approval or external input
      BLOCKED = "blocked"      # Cannot proceed — dependency unavailable
      RETRYING = "retrying"    # Retrying after a failed attempt
      DONE = "done"            # Terminal: success
      FAILED = "failed"        # Terminal: unrecoverable failure


  # ── Event Enum ───────────────────────────────────────
  class TaskEvent(str, Enum):
      """Events that trigger state transitions."""
      START = "start"
      PAUSE_FOR_APPROVAL = "pause_for_approval"
      APPROVAL_GRANTED = "approval_granted"
      APPROVAL_DENIED = "approval_denied"
      BLOCK_ON_DEPENDENCY = "block_on_dependency"
      DEPENDENCY_RESOLVED = "dependency_resolved"
      TRANSIENT_ERROR = "transient_error"
      RETRY = "retry"
      MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
      COMPLETE = "complete"
      FATAL_ERROR = "fatal_error"
      TIMEOUT = "timeout"


  # ── Transition Table ─────────────────────────────────
  # Format: (current_state, event) → target_state
  # Not in table = illegal → raise InvalidTransitionError
  TRANSITION_TABLE: Dict[tuple, TaskState] = {
      (TaskState.PLANNED,   TaskEvent.START):                  TaskState.RUNNING,
      (TaskState.RUNNING,   TaskEvent.PAUSE_FOR_APPROVAL):     TaskState.PAUSED,
      (TaskState.RUNNING,   TaskEvent.BLOCK_ON_DEPENDENCY):    TaskState.BLOCKED,
      (TaskState.RUNNING,   TaskEvent.COMPLETE):               TaskState.DONE,
      (TaskState.RUNNING,   TaskEvent.FATAL_ERROR):            TaskState.FAILED,
      (TaskState.RUNNING,   TaskEvent.TRANSIENT_ERROR):        TaskState.RETRYING,
      (TaskState.PAUSED,    TaskEvent.APPROVAL_GRANTED):       TaskState.RUNNING,
      (TaskState.PAUSED,    TaskEvent.APPROVAL_DENIED):        TaskState.FAILED,
      (TaskState.PAUSED,    TaskEvent.TIMEOUT):                TaskState.FAILED,
      (TaskState.BLOCKED,   TaskEvent.DEPENDENCY_RESOLVED):    TaskState.RUNNING,
      (TaskState.BLOCKED,   TaskEvent.FATAL_ERROR):            TaskState.FAILED,
      (TaskState.RETRYING,  TaskEvent.RETRY):                  TaskState.RUNNING,
      (TaskState.RETRYING,  TaskEvent.MAX_RETRIES_EXCEEDED):   TaskState.FAILED,
      (TaskState.RETRYING,  TaskEvent.FATAL_ERROR):            TaskState.FAILED,
      # DONE and FAILED are terminal: no outgoing edges
  }


  # ── Data Classes ─────────────────────────────────────
  @dataclass
  class StateTransition:
      """An immutable record of a state change."""
      task_id: str
      from_state: TaskState
      to_state: TaskState
      event: TaskEvent
      timestamp: str
      metadata: Dict[str, Any] = field(default_factory=dict)


  class InvalidTransitionError(Exception):
      """Raised when a transition is not in the transition table."""
      pass


  # ── State Persistence (SQLite) ───────────────────────
  class StatePersistence:
      """SQLite-backed state persistence.

      Production considerations:
      - Multi-instance deployments should use PostgreSQL
      - SQLite is suitable for single-machine demos or single-process agents
      - Use WAL mode to reduce lock contention
      """

      def __init__(self, db_path: str = "agent_state.db"):
          self.conn = sqlite3.connect(db_path)
          self.conn.execute("PRAGMA journal_mode=WAL")
          self._init_schema()

      def _init_schema(self):
          self.conn.executescript("""
              CREATE TABLE IF NOT EXISTS task_checkpoints (
                  task_id TEXT PRIMARY KEY,
                  state TEXT NOT NULL,
                  retry_count INTEGER DEFAULT 0,
                  checkpoint_data TEXT DEFAULT '{}',
                  updated_at TEXT NOT NULL
              );
              CREATE TABLE IF NOT EXISTS state_transitions (
                  id INTEGER PRIMARY KEY AUTOINCREMENT,
                  task_id TEXT NOT NULL,
                  from_state TEXT NOT NULL,
                  to_state TEXT NOT NULL,
                  event TEXT NOT NULL,
                  timestamp TEXT NOT NULL,
                  metadata TEXT DEFAULT '{}'
              );
              CREATE INDEX IF NOT EXISTS idx_transitions_task
                  ON state_transitions(task_id, timestamp);
          """)
          self.conn.commit()

      def save_transition(self, t: StateTransition) -> None:
          self.conn.execute(
              "INSERT INTO state_transitions "
              "(task_id, from_state, to_state, event, timestamp, metadata) "
              "VALUES (?, ?, ?, ?, ?, ?)",
              (t.task_id, t.from_state.value, t.to_state.value,
               t.event.value, t.timestamp, json.dumps(t.metadata))
          )
          self.conn.commit()

      def save_checkpoint(self, task_id: str, state: TaskState,
                          retry_count: int, checkpoint_data: dict = None) -> None:
          self.conn.execute(
              """INSERT OR REPLACE INTO task_checkpoints
                 (task_id, state, retry_count, checkpoint_data, updated_at)
                 VALUES (?, ?, ?, ?, ?)""",
              (task_id, state.value, retry_count,
               json.dumps(checkpoint_data or {}),
               datetime.now(timezone.utc).isoformat())
          )
          self.conn.commit()

      def load_checkpoint(self, task_id: str) -> Optional[dict]:
          row = self.conn.execute(
              "SELECT state, retry_count, checkpoint_data "
              "FROM task_checkpoints WHERE task_id = ?",
              (task_id,)
          ).fetchone()
          if row:
              return {
                  "state": row[0],
                  "retry_count": row[1],
                  "checkpoint_data": json.loads(row[2])
              }
          return None

      def load_history(self, task_id: str) -> List[StateTransition]:
          rows = self.conn.execute(
              "SELECT task_id, from_state, to_state, event, timestamp, metadata "
              "FROM state_transitions WHERE task_id = ? ORDER BY id",
              (task_id,)
          ).fetchall()
          return [
              StateTransition(
                  task_id=r[0],
                  from_state=TaskState(r[1]),
                  to_state=TaskState(r[2]),
                  event=TaskEvent(r[3]),
                  timestamp=r[4],
                  metadata=json.loads(r[5])
              ) for r in rows
          ]


  # ── Core State Machine Class ─────────────────────────
  class AgentTaskStateMachine:
      """
      A recoverable state machine for agent tasks.

      Guarantees:
      - Every state transition is validated against the transition table
      - Every transition is persisted (auditable and recoverable)
      - Task state can be recovered after a restart
      """

      def __init__(self, task_id: str, persistence: StatePersistence,
                   max_retries: int = 3):
          self.task_id = task_id
          self.persistence = persistence
          self._state: TaskState = TaskState.PLANNED
          self._history: List[StateTransition] = []
          self._retry_count: int = 0
          self._max_retries: int = max_retries
          self._checkpoint: Dict[str, Any] = {}
          self._on_transition_callbacks: List[callable] = []

      # ── Properties ───────────────────────────────────
      @property
      def state(self) -> TaskState:
          return self._state

      @property
      def retry_count(self) -> int:
          return self._retry_count

      @property
      def history(self) -> List[StateTransition]:
          return list(self._history)

      # ── Observability Hook ───────────────────────────
      def on_transition(self, callback: callable) -> None:
          """Register a transition callback (logging, metrics, alerts)."""
          self._on_transition_callbacks.append(callback)

      def _notify_observers(self, transition: StateTransition) -> None:
          for cb in self._on_transition_callbacks:
              try:
                  cb(transition)
              except Exception:
                  pass  # Observation failure must not affect the transition

      # ── Core Transition Method ───────────────────────
      def transition(self, event: TaskEvent,
                     metadata: Dict[str, Any] = None) -> TaskState:
          """
          Attempt a state transition. Transitions not in the table raise
          InvalidTransitionError.
          """
          key = (self._state, event)
          next_state = TRANSITION_TABLE.get(key)
          if next_state is None:
              raise InvalidTransitionError(
                  f"Invalid transition: {self._state.value} + {event.value}"
              )

          # Retry count check
          if event == TaskEvent.RETRY:
              self._retry_count += 1
              if self._retry_count > self._max_retries:
                  raise InvalidTransitionError(
                      f"Max retries exceeded ({self._max_retries}) — "
                      f"use MAX_RETRIES_EXCEEDED event"
                  )

          # Build transition record
          transition_record = StateTransition(
              task_id=self.task_id,
              from_state=self._state,
              to_state=next_state,
              event=event,
              timestamp=datetime.now(timezone.utc).isoformat(),
              metadata=metadata or {},
          )

          # Execute transition
          self._history.append(transition_record)
          self._state = next_state

          # Persist
          self.persistence.save_transition(transition_record)
          self.persistence.save_checkpoint(
              self.task_id, self._state, self._retry_count,
              self._checkpoint
          )

          # Notify observers
          self._notify_observers(transition_record)

          return self._state

      # ── Checkpoint Management ────────────────────────
      def set_checkpoint(self, key: str, value: Any) -> None:
          """Store intermediate data for recovery."""
          self._checkpoint[key] = value

      def get_checkpoint(self, key: str) -> Optional[Any]:
          """Read checkpoint data."""
          return self._checkpoint.get(key)

      def has_checkpoint(self, key: str) -> bool:
          """Check whether a checkpoint exists."""
          return key in self._checkpoint

      # ── Idempotent Execution Helper ──────────────────
      def execute_idempotent(self, step_name: str,
                             action: callable) -> Any:
          """
          Execute a step idempotently.

          action should accept idempotency_key as a parameter and pass
          it to the external system. If the step already completed
          (checkpoint exists), returns the cached result directly.
          """
          idempotency_key = f"{self.task_id}:{step_name}"

          # Already done? Return cached result
          if self.get_checkpoint(idempotency_key) == "done":
              return self.get_checkpoint(f"{idempotency_key}:result")

          # Mark as executing
          self.set_checkpoint(idempotency_key, "executing")

          try:
              result = action(idempotency_key)
              self.set_checkpoint(idempotency_key, "done")
              self.set_checkpoint(f"{idempotency_key}:result", result)
              return result
          except Exception:
              raise

      # ── Recovery ─────────────────────────────────────
      def recover(self) -> "AgentTaskStateMachine":
          """Restore task state from durable storage."""
          saved = self.persistence.load_checkpoint(self.task_id)
          if saved:
              self._state = TaskState(saved["state"])
              self._retry_count = saved.get("retry_count", 0)
              self._checkpoint = saved.get("checkpoint_data", {})

          self._history = self.persistence.load_history(self.task_id)

          # Handle stale states
          if self._state == TaskState.RUNNING:
              self.transition(TaskEvent.TRANSIENT_ERROR,
                  metadata={"reason": "recovery_stale_running"})

          elif self._state == TaskState.RETRYING:
              if self._retry_count < self._max_retries:
                  self.transition(TaskEvent.RETRY)
              else:
                  self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)

          elif self._state == TaskState.PAUSED:
              last_t = self._history[-1] if self._history else None
              if last_t and last_t.metadata.get("timeout_after"):
                  elapsed = (datetime.now(timezone.utc) -
                             datetime.fromisoformat(last_t.timestamp))
                  if elapsed.total_seconds() > last_t.metadata["timeout_after"]:
                      self.transition(TaskEvent.TIMEOUT,
                          metadata={"reason": "recovery_approval_timeout"})

          elif self._state == TaskState.BLOCKED:
              pass  # Stay blocked, rely on external alerting

          return self

      # ── Diagnostic Method ────────────────────────────
      def summary(self) -> dict:
          """Return a summary of the task's current state."""
          return {
              "task_id": self.task_id,
              "state": self._state.value,
              "retry_count": self._retry_count,
              "transition_count": len(self._history),
              "checkpoint_keys": list(self._checkpoint.keys()),
              "is_terminal": self._state in (TaskState.DONE, TaskState.FAILED),
          }


  # ── Usage Demos ──────────────────────────────────────
  def demo_full_lifecycle():
      """Demonstrate a full task lifecycle including pause, retry, recovery."""
      print("=" * 60)
      print("AgentTaskStateMachine Lifecycle Demo")
      print("=" * 60)

      persistence = StatePersistence(":memory:")
      task_id = f"task_{uuid.uuid4().hex[:8]}"
      sm = AgentTaskStateMachine(task_id=task_id, persistence=persistence)

      sm.on_transition(lambda t: print(
          f"  [observe] {t.from_state.value} → {t.to_state.value} "
          f"({t.event.value})"
      ))

      # 1. planned → running
      sm.transition(TaskEvent.START)
      print(f"[{sm.state.value}] Task started\n")

      # 2. running → paused (human approval needed)
      sm.transition(TaskEvent.PAUSE_FOR_APPROVAL,
                    metadata={"step": "refund_approval", "amount": 150.00})
      print(f"[{sm.state.value}] Awaiting human approval\n")

      # 3. paused → running (approval granted)
      sm.transition(TaskEvent.APPROVAL_GRANTED,
                    metadata={"approver": "[email protected]"})
      print(f"[{sm.state.value}] Approval granted, resuming\n")

      # 4. running → retrying (transient error)
      sm.transition(TaskEvent.TRANSIENT_ERROR,
                    metadata={"error": "rate_limit", "step": "send_notification"})
      print(f"[{sm.state.value}] Transient error, "
            f"retry {sm.retry_count}/{sm._max_retries}\n")

      # 5. retrying → running (retry)
      sm.transition(TaskEvent.RETRY)
      print(f"[{sm.state.value}] Retrying...\n")

      # 6. running → done
      sm.transition(TaskEvent.COMPLETE,
                    metadata={"result": "refund_processed"})
      print(f"[{sm.state.value}] Task completed\n")

      # 7. Test invalid transition
      print("--- Testing Invalid Transition ---")
      try:
          sm.transition(TaskEvent.START)
      except InvalidTransitionError as e:
          print(f"[guard] Invalid transition blocked: {e}\n")

      # 8. View transition history
      print("--- Transition History ---")
      for i, t in enumerate(sm.history, 1):
          print(f"  {i}. {t.from_state.value} → {t.to_state.value} "
                f"({t.event.value}) @ {t.timestamp[:19]}")

      print(f"\n--- Summary ---")
      for k, v in sm.summary().items():
          print(f"  {k}: {v}")


  def demo_recovery():
      """Demonstrate recovery after a restart."""
      print("\n" + "=" * 60)
      print("Restart Recovery Demo")
      print("=" * 60)

      persistence = StatePersistence(":memory:")
      task_id = "recovered_task_001"

      sm1 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
      sm1.transition(TaskEvent.START)
      sm1.transition(TaskEvent.COMPLETE)
      print(f"Instance 1: Task completed, state = {sm1.state.value}")

      sm2 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
      sm2.recover()
      print(f"Instance 2: Recovered state = {sm2.state.value}")

      assert sm2.state == TaskState.DONE, "Recovery failed!"
      print("✓ Recovery verified")
      print(f"  Recovered history entries: {len(sm2.history)}")


  def demo_idempotent_execution():
      """Demonstrate idempotent execution."""
      print("\n" + "=" * 60)
      print("Idempotent Execution Demo")
      print("=" * 60)

      persistence = StatePersistence(":memory:")
      sm = AgentTaskStateMachine(task_id="idempotent_demo", persistence=persistence)
      sm.transition(TaskEvent.START)

      call_count = [0]

      def external_call(idempotency_key: str) -> str:
          call_count[0] += 1
          print(f"  → external call #{call_count[0]} "
                f"(idempotency_key={idempotency_key})")
          return f"result_{call_count[0]}"

      r1 = sm.execute_idempotent("step_refund", external_call)
      print(f"  1st call: {r1} (external calls: {call_count[0]})")

      r2 = sm.execute_idempotent("step_refund", external_call)
      print(f"  2nd call: {r2} (external calls: {call_count[0]})")

      assert call_count[0] == 1, "Duplicate execution! Should be called once"
      assert r1 == r2, "Results differ!"
      print("✓ Idempotency verified")


  if __name__ == "__main__":
      demo_full_lifecycle()
      demo_recovery()
      demo_idempotent_execution()
  

Sample Output


  ============================================================
  AgentTaskStateMachine Lifecycle Demo
  ============================================================
    [observe] planned → running (start)
  [running] Task started

    [observe] running → paused (pause_for_approval)
  [paused] Awaiting human approval

    [observe] paused → running (approval_granted)
  [running] Approval granted, resuming

    [observe] running → retrying (transient_error)
  [retrying] Transient error, retry 0/3

    [observe] retrying → running (retry)
  [running] Retrying...

    [observe] running → done (complete)
  [done] Task completed

  --- Testing Invalid Transition ---
  [guard] Invalid transition blocked: Invalid transition: done + start

  --- Transition History ---
    1. planned → running (start)
    2. running → paused (pause_for_approval)
    3. paused → running (approval_granted)
    4. running → retrying (transient_error)
    5. retrying → running (retry)
    6. running → done (complete)

  --- Summary ---
    task_id: task_a1b2c3d4
    state: done
    retry_count: 1
    transition_count: 6
    checkpoint_keys: []
    is_terminal: True

  ============================================================
  Restart Recovery Demo
  ============================================================
  Instance 1: Task completed, state = done
  Instance 2: Recovered state = done
  ✓ Recovery verified
    Recovered history entries: 2

  ============================================================
  Idempotent Execution Demo
  ============================================================
    → external call #1 (idempotency_key=idempotent_demo:step_refund)
    1st call: result_1 (external calls: 1)
    2nd call: result_1 (external calls: 1)
  ✓ Idempotency verified
  

Production Hardening Checklist

To use this skeleton in production, the following adaptations are needed:

  1. PostgreSQL instead of SQLite: Multi-instance agents need concurrent write support. Use connection pooling (e.g., psycopg2 pool), transaction management, and primary-replica replication.
  2. Database migrations: Use Alembic or similar to manage schema versions. Task checkpoint schemas will evolve — migration support is essential.
  3. Connection pooling and transaction safety: Every transition() call must be atomic — if persistence fails, the transition must roll back. The current SQLite implementation uses simple commits; production needs proper transaction management.
  4. Concurrency control: Multiple processes operating on the same task's state machine need optimistic locking or distributed locks. The simplest approach: use updated_at on task_checkpoints for optimistic concurrency control (compare-and-swap).
  5. Horizontal scaling: When state storage becomes the bottleneck, shard by task_id. Each shard gets an independent PostgreSQL instance. Transition history can be written asynchronously; checkpoint data must be written synchronously.
  6. Security hardening: State machine operations need authentication and authorization — not everyone should be able to change task states. Integrate with your IAM system.
  7. Configurable retry policies: Extract exponential backoff parameters (base, max_wait, jitter) into configuration. Different task types may need different retry strategies.
  8. Configurable recovery policies: Not all stale running states should transition to retrying — some tasks should remain in running while awaiting a heartbeat check. Recovery policies should be configurable per task type.

Limitations — An Honest Accounting

📌 What this state machine does:
  1. Guarantees that within the agent, state transitions are deterministic and recoverable.
  2. Prevents the agent from initiating duplicate calls for the same operation (via checkpoints and idempotency keys).
  3. Provides a complete audit trail for every task.
What it cannot do:
  1. Make LLM output deterministic — the LLM within the running state can still produce different reasoning results.
  2. Guarantee exactly-once external side effects — if the external API does not support idempotency keys, duplicate calls can still produce duplicate effects.
  3. Provide distributed transactions — the state machine's internal state and the external system's state are not atomically consistent.
  4. Replace orchestration engines like Temporal or LangGraph — they solve a different problem (external workflow orchestration vs. agent-internal task lifecycle).
In one sentence: The state machine gives you a deterministic answer to "what is the agent doing?" It does not give you a deterministic answer to "what is the agent thinking?" The former is an engineering problem; the latter is an AI problem.

FAQ

How is this different from Temporal or AWS Step Functions? Do I still need an orchestrator?

This state machine is inside the agent — it is the agent's own understanding of where it is in a task. Temporal and Step Functions are external orchestrators that tell workers what to do. They are complementary: the agent's internal state machine ensures it doesn't skip steps or duplicate work; an external orchestrator (if you use one) ensures the agent gets invoked at the right time. This article demonstrates the pattern of building a state machine into the agent; whether you add an outer orchestration layer depends on your needs.

My agent already uses LangGraph's checkpointing. Why add an explicit state machine?

LangGraph checkpoints the graph traversal state (which node, which edge condition), not the task lifecycle. If your task needs to distinguish "paused for approval" from "blocked on dependency" from "retrying after error," you need explicit lifecycle state. The good news: LangGraph's checkpoint is an excellent persistence backend — you can build this article's 7-state machine on top of LangGraph's checkpoint storage.

How does the state machine prevent duplicate side effects?

Through the dual protection of idempotency keys + checkpoints. Before any side-effecting operation: (1) check whether the checkpoint already contains that operation's idempotency key with status "done" — if so, return the cached result without executing; (2) if not, first write the idempotency key (status "executing"), then execute the operation, then update to "done" on success. If the process crashes while in "executing" status, recovery sees "executing" and knows that attempt's outcome is uncertain — it should query the external system rather than blindly retrying. See Section 5 for details.

What happens when the LLM context is gone after a restart?

The state machine does not recover the full conversation history — it only recovers task state and key checkpoint data. After a restart: (1) read from the state machine "which step the task is on, what the previous results were"; (2) use this information to construct a new context summary (not the full conversation history); (3) inject this summary into the LLM's fresh context window so the LLM can continue reasoning from that point. The conversation history is lost, but task progress is not. On how to rebuild working context from checkpoints, see Agent Context Window Management, specifically the cross-window state management section.

How does the state machine handle LLM nondeterminism?

The state machine does not eliminate LLM nondeterminism — it contains it. The LLM operates within the running state and can produce varied outputs, but it cannot change the task state. State transitions happen outside the LLM in deterministic code, based on structured events (tool results, approval callbacks, error signals), not LLM-generated text. The state machine provides a deterministic framework around probabilistic LLM execution — not a deterministic LLM.

Is this overengineering for a simple agent?

If your agent does one-shot question answering — one call, one answer — then yes, the state machine is overengineering. The complete Python skeleton in this article is roughly 250 lines of core code (excluding comments and demo code). This complexity buys you: queryable task state at any moment, non-bypassable approval gates, retry deduplication, and restart recovery. If your agent executes multi-step workflows with side effects, involves human approval, or runs longer than the LLM's context window, then the debugging cost without a state machine (investigating "why was the refund issued twice?", fixing "why was approval skipped?", recovering "what do I do after restart?") far exceeds 250 lines of code. The state machine is the minimum viable reliability layer for long-running agents — not an optional luxury.

Next Steps

This article is the deterministic task lifecycle chapter of the Agent Memory and Context Engineering series. Here is where to go next, ordered by dependency: