Agent Rollback Design: Recovering When Agent Automation Writes Bad Files
30-Second Takeaway
- The Problem: When an agent autonomously writes files, a single bad write can corrupt configurations, break database state, and introduce bugs that are impossible to trace — and retries won't fix them. Without a rollback mechanism, the cost of recovery escalates from "re-run the task" to "manual forensic restore."
- Three Strategies: Snapshot — capture full state before the operation, restore on failure. Transaction — write-ahead log + atomic commit, precise to a single write. Compensation — execute a semantically equivalent reverse operation, for external side effects that snapshots and transactions can't cover.
- Key Design: Rollback is not an after-the-fact remedy — the undo path must be prepared before the write executes. This article's core principle: every agent write operation must register an executable undo handler in the system, or the write does not proceed.
- What You'll Learn: Design a layered rollback architecture for your agent system — automatically select snapshot/transaction/compensation strategies based on operation type, build systematic undo capability across files, data, and environments, and make every agent write reversible.
1. Why Rollback Is a First-Class Requirement for Autonomous Agents
A deployment agent is executing "update Nginx configuration across 14 servers to enable new rate-limiting rules." It replaces /etc/nginx/nginx.conf on server 9, but the limit_req_zone syntax has a typo. Nginx fails to restart. Server 9 is now offline. Worse: these were the first 9 servers in a canary rollout. The remaining 5 haven't been touched yet, and you're trapped.
What you need is not a retry — retrying writes the same broken config. What you need is not version control — git checkout nginx.conf restores one file, but the agent may have simultaneously modified /etc/hosts, written a new systemd unit file, and added iptables rules. Version control only tracks files it knows about; an agent's write footprint extends far beyond that boundary.
What you need is a systematic rollback mechanism: before every write operation executes, the system has already prepared enough information and logic to undo that write.
Five Ways Agents Break Things by Writing
Autonomous agent writes have three properties that make them inherently error-prone: high autonomy, high destructive potential, and low visibility. These five failure patterns recur in production:
- Config Corruption: The agent introduces a syntax error or semantic contradiction while modifying a configuration file. The classic example: modifying a YAML file where an indentation error causes the entire config tree to parse as a completely different structure. A bad write can be a 2-byte diff with a service-outage consequence.
- Silent Overwrite: The agent generates new content and accidentally overwrites an existing critical file. Example: the agent is asked to create README.md but doesn't check for file existence, replacing the existing README with an empty template.
- Cascading Write Failure: One bad write triggers a chain reaction in downstream systems. The agent modifies a database config record read by 5 microservices — all 5 enter error states in sequence.
- Partial Mutation: The agent executes a multi-step operation but fails mid-way. Steps already executed have modified code, config, and environment variables — the system enters an inconsistent state.
- Non-Deterministic Write: The same agent instruction produces different write results in different environments. The agent writes a correct config in staging, but in production writes it in the wrong format due to a library version difference.
📌 Key Insight: An agent's write errors are fundamentally different from a human engineer's. Humans check before writing — cat, diff, --dry-run. Agents by default do none of these. The agent's "thinking" consumes the same attention budget — the further into context, the more error-prone the write decisions become. Rollback is not a nice-to-have — it is the missing link in the agent autonomous-write model.
Why Version Control and Retries Are Not Enough
Many engineers ask: "Don't we have git?" or "Can't the agent just retry?" Both intuitions are dangerous misunderstandings:
- Git is not rollback: Git tracks files that you committed. Agent writes happen outside Git repos — configs in
/etc/nginx/,.envfiles not in Git, database records, runtime caches. Git can only restore to the last commit, but the agent may have done dozens of writes between commits. - Retries don't fix write errors: Retries handle transient failures but not logical write errors. If the LLM incorrectly infers a format, retrying 10 times produces 10 identically wrong formats. Retries amplify damage rather than repairing it.
- Retries + version control still aren't enough: Agent modifies F1, fails, retries and modifies F2 based on F1 being modified. Git restores F1, but F2's modification is now based on a nonexistent precondition.
The correct framing: rollback is an independent design dimension, separate from version control and retries. Version control gives you coarse-grained restore-to-known-good. Retries give you stateless reattempt. Rollback gives you fine-grained, stateful, composable undo of one specific write operation.
For agent failure recovery, see Agent Error Recovery — rollback occupies the most aggressive end of the recovery strategy spectrum. State machines provide the deterministic shell around LLM execution that makes rollback predictable; see Agent State Machine Design. Rollback requires observability infrastructure; see Agent Observability. Audit logs determine rollback scope; see Agent Audit Log Design.
Design Philosophy: Prepare the Undo Path Before Executing the Write
Core design principle: no write operation proceeds without a registered undo handler. Before every agent write, the system must: (1) capture the pre-write state, (2) register a callable undo function, (3) validate the undo function's viability. Only when these three steps are complete is the actual write permitted.
The code below implements this principle in its minimal form:
from __future__ import annotations
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Any
from contextlib import contextmanager
import logging
logger = logging.getLogger("agent.rollback")
# ---------------------------------------------------------------------------
# RollbackNeededError — any write operation failure throws this unified
# exception. Carries undo_handler so callers can execute rollback without
# knowing the operation's internal details.
# ---------------------------------------------------------------------------
@dataclass
class RollbackNeededError(Exception):
"""
Raised when a write operation fails after making partial changes.
Carries the undo handler so callers can revert without knowing
the operation's internals.
"""
message: str
operation_id: str
# undo() restores the pre-write state. Returns True on success.
undo_handler: Callable[[], bool]
def __str__(self) -> str:
return f"RollbackNeededError(op={self.operation_id}): {self.message}"
# ---------------------------------------------------------------------------
# safe_agent_write — a context manager that enforces "prepare undo first"
# for any file write operation. Every write is wrapped in a
# snapshot → write → validate pipeline. If validation fails or an exception
# occurs, the snapshot is restored.
# ---------------------------------------------------------------------------
@dataclass
class WriteRecord:
"""Metadata for a single write operation within the context manager."""
file_path: Path
snapshot_path: Optional[Path] = None
succeeded: bool = False
undo_registered: bool = False
class WriteContext:
"""Accumulator for write records; caller inspects after the block."""
def __init__(self) -> None:
self.records: list[WriteRecord] = []
def add(self, record: WriteRecord) -> None:
self.records.append(record)
@property
def failed(self) -> list[WriteRecord]:
return [r for r in self.records if not r.succeeded]
@property
def succeeded(self) -> list[WriteRecord]:
return [r for r in self.records if r.succeeded]
@contextmanager
def safe_agent_write(
file_path: str | Path,
content: str | bytes,
*,
operation_id: str = "",
staging_dir: str | Path = "",
validator: Callable[[Path], bool] | None = None,
ctx: WriteContext | None = None,
):
"""
Context manager that guarantees a file write is undoable.
Pipeline:
1. SNAPSHOT — copy the target file to a temp location (if it exists).
2. STAGE — write content to a staging file (never in-place).
3. VALIDATE — run optional validator on the staged content.
4. ATOMIC REPLACE — os.replace() from staging to target.
5. ON FAILURE — restore snapshot if anything goes wrong.
Design decisions:
- Stage-never-write-in-place: the target file is never partially
modified. Either the staged file passes validation and atomically
replaces the target, or the target is untouched.
- Snapshot-on-first-touch: the snapshot is taken only when content
differs from current file, avoiding unnecessary copies for no-op
writes.
- Validator hook: caller can supply a function that inspects the
staged file (e.g., linter, schema check, dry-run) before the atomic
replace. If the validator returns False or raises, the write is
aborted and snapshot restored.
Args:
file_path: Target file to write.
content: Content to write (str or bytes).
operation_id: Unique ID for logging and error tracing.
staging_dir: Where to create staging files (default: target's dir).
validator: Optional callable(Path) -> bool.
ctx: Optional WriteContext to accumulate records.
Yields:
Path to the staging file (so caller can inspect before commit).
Raises:
RollbackNeededError: if any step fails, with undo_handler attached.
"""
target = Path(file_path).resolve()
staging_root = Path(staging_dir) if staging_dir else target.parent
staging_root.mkdir(parents=True, exist_ok=True)
record = WriteRecord(file_path=target)
snapshot: Optional[Path] = None
staging: Optional[Path] = None
undo_registered = False
try:
# -- Step 1: Snapshot ------------------------------------------------
# Only snapshot if the file exists AND content actually differs.
# No-op writes should not generate snapshots.
if target.exists():
current_bytes = target.read_bytes()
new_bytes = (
content if isinstance(content, bytes)
else content.encode("utf-8")
)
if current_bytes == new_bytes:
# Content unchanged -- skip the write entirely
record.succeeded = True
if ctx:
ctx.add(record)
yield target
return
# Content differs: create snapshot
snapshot = staging_root / f".rollback-snap-{operation_id}-{target.name}"
shutil.copy2(target, snapshot)
logger.debug("Snapshot created: %s -> %s", target, snapshot)
else:
# File does not exist yet -- nothing to snapshot. Undo means
# deleting the new file if creation fails validation.
pass
# -- Step 2: Stage ---------------------------------------------------
staging = staging_root / f".rollback-stage-{operation_id}-{target.name}"
if isinstance(content, str):
staging.write_text(content, encoding="utf-8")
else:
staging.write_bytes(content)
logger.debug("Staged content: %s (%d bytes)", staging, len(content))
# -- Step 3: Register undo handler BEFORE committing -----------------
def undo() -> bool:
"""Restore the pre-write state."""
try:
if snapshot is not None and snapshot.exists():
# Restore from snapshot
shutil.copy2(snapshot, target)
snapshot.unlink(missing_ok=True)
logger.info(
"Rollback: restored %s from snapshot %s",
target, snapshot,
)
elif not target.exists():
# File was created by this write -- nothing to restore
logger.info(
"Rollback: nothing to restore for %s (was new file)",
target,
)
else:
# Edge case: snapshot missing but file exists.
# Best effort: remove the file created by this op.
logger.warning(
"Rollback: no snapshot for %s, removing as best-effort",
target,
)
target.unlink(missing_ok=True)
return True
except Exception as exc:
logger.error("Rollback failed for %s: %s", target, exc)
return False
record.undo_registered = True
undo_registered = True
# -- Step 4: Validate staged content ---------------------------------
if validator is not None:
try:
if not validator(staging):
raise RollbackNeededError(
message=f"Validation rejected staged content for {target}",
operation_id=operation_id,
undo_handler=undo,
)
except RollbackNeededError:
raise
except Exception as exc:
raise RollbackNeededError(
message=f"Validator raised exception: {exc}",
operation_id=operation_id,
undo_handler=undo,
) from exc
# Yield staging path so caller can inspect before commit
yield staging
# -- Step 5: Atomic replace ------------------------------------------
# os.replace is atomic on POSIX -- the target is either the
# old file or the new one, never a partial write.
os.replace(staging, target)
staging = None # prevent cleanup (already moved)
# -- Step 6: Cleanup snapshot on success -----------------------------
if snapshot is not None and snapshot.exists():
snapshot.unlink()
snapshot = None
record.succeeded = True
logger.info("Write committed: %s (op=%s)", target, operation_id)
except RollbackNeededError:
# Re-raise so caller gets the undo_handler
raise
except Exception as exc:
# Any unexpected error becomes a RollbackNeededError with undo
raise RollbackNeededError(
message=f"Write operation failed: {exc}",
operation_id=operation_id,
undo_handler=(
lambda: True if not undo_registered
else _make_cleanup_undo(snapshot, staging, target)
),
) from exc
finally:
# Always add the record so callers can inspect what happened
if ctx:
ctx.add(record)
# Cleanup orphaned temp files (fail-safe, not-already-cleaned)
if staging is not None:
staging.unlink(missing_ok=True)
if snapshot is not None and snapshot.exists():
# Snapshot still exists -> write didn't succeed, restore it
shutil.copy2(snapshot, target)
snapshot.unlink(missing_ok=True)
logger.warning(
"Rollback: emergency restore of %s from snapshot", target,
)
def _make_cleanup_undo(
snapshot: Optional[Path],
staging: Optional[Path],
target: Path,
) -> Callable[[], bool]:
"""Factory for undo handler on unexpected errors."""
def undo() -> bool:
try:
if staging is not None:
staging.unlink(missing_ok=True)
if snapshot is not None and snapshot.exists():
shutil.copy2(snapshot, target)
snapshot.unlink(missing_ok=True)
logger.info(
"Cleanup rollback: restored %s from snapshot", target,
)
return True
except Exception as exc:
logger.error("Cleanup rollback failed: %s", exc)
return False
return undo
# ---------------------------------------------------------------------------
# Usage example — demonstrating the full lifecycle
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import json
# Setup: create a test file to write into
test_file = Path("/tmp/agent-rollback-demo.conf")
test_file.write_text('server { listen 80; }\n')
ctx = WriteContext()
# -- Successful write ----------------------------------------------------
try:
with safe_agent_write(
test_file,
'server { listen 443 ssl; }\n',
operation_id="nginx-tls-upgrade",
ctx=ctx,
# Validator: staged file must contain "listen"
validator=lambda p: "listen" in p.read_text(),
) as staged:
print(f"Staged at: {staged}")
# If we reach here without exception, atomic replace happens
except RollbackNeededError as e:
print(f"ROLLBACK TRIGGERED: {e}")
success = e.undo_handler()
print(f"Undo result: {success}")
print(f"File content after write: {test_file.read_text()!r}")
print(
f"Write records: {len(ctx.records)} total, "
f"{len(ctx.succeeded)} ok, {len(ctx.failed)} failed"
)
# -- Failed write (validator rejects) ------------------------------------
ctx2 = WriteContext()
try:
with safe_agent_write(
test_file,
'bad config content\n',
operation_id="nginx-bad-config",
ctx=ctx2,
# Validator: reject content that doesn't contain "server"
validator=lambda p: "server" in p.read_text(),
):
pass # This should never be reached
except RollbackNeededError as e:
print(f"\nEXPECTED ROLLBACK: {e}")
success = e.undo_handler()
print(f"Undo result: {success}")
# File should still contain the content from the successful write
print(f"File content after failed write: {test_file.read_text()!r}")
print(
f"Write records: {len(ctx2.records)} total, "
f"{len(ctx2.succeeded)} ok, {len(ctx2.failed)} failed"
)
# Cleanup
test_file.unlink(missing_ok=True)
This context manager embodies the article's core philosophy: the undo path is fully constructed before the actual write occurs. The RollbackNeededError carrying undo_handler allows callers to execute rollback without knowing the internal details of the write operation — whether it's a file write or a database mutation, you just call undo(). This "rollback interface decoupled from operation implementation" pattern carries through every strategy discussed in the sections ahead.
But real-world scenarios are far more complex than a single file write. Agents may simultaneously operate on multiple files, modify database records, and call external APIs. The next section covers the three fundamental rollback strategies — Snapshot, Transaction, and Compensation — and how to automatically select the right one based on operation type.
2. Rollback Strategies: Snapshot, Transaction, and Compensation
Not all operations can be rolled back the same way. A local file modification, a transactional database write, and a Stripe API subscription creation have fundamentally different rollback mechanisms. Forcing one strategy onto all operations yields one of two outcomes: too expensive or impossible.
This section defines the three foundational rollback strategies and provides an orchestrator — RollbackOrchestrator — that automatically selects the strategy based on operation type.
Strategy 1: Snapshot — Full State Before the Operation
Core idea: Before making any modification, save a complete copy of the modified object's current state into a snapshot. If the operation fails or needs to be undone, overwrite the current state with the snapshot.
When to use:
- File writes:
cp file file.snapbefore,cp file.snap fileon failure - Directory operations: tar snapshot, or leverage copy-on-write filesystem features (btrfs/zfs)
- Configuration changes: parse the current config tree and serialize it
- Container/VM state: use checkpoint capabilities (CRIU, Docker checkpoint)
Advantages: Simple to implement; fast rollback (one copy operation); no dependency on understanding operation semantics.
Costs: High storage overhead — every write saves a full copy. A 2GB file written 100 times → 200GB of snapshots.
When to choose: (1) Target object < 100MB; (2) Write frequency < 5/min; (3) Operation is an indivisible whole requiring synchronized restore.
Strategy 2: Transaction — Write-Ahead Log + Atomic Commit
Core idea: Record only intent and commit logs. Write an intent entry before the operation; mark it committed after success. On rollback, reverse committed entries; discard uncommitted ones.
When to use:
- Database operations: intent record contains old value; rollback writes it back
- Structured file writes (JSON/YAML/TOML): record key-value diffs; rollback restores key by key
- Batch file operations: multiple writes as one transaction unit — all-or-nothing
- Multi-step operations requiring sequencing: WAL guarantees traceable operation order
Advantages: Minimal storage overhead (deltas only); fine-grained rollback (per-key); transactional all-or-nothing semantics. The WAL itself is a queryable operation log — see Agent Audit Log Design.
Costs: High implementation complexity; cross-system transactions are a distributed systems problem; cannot cover external side effects.
When to choose: (1) Large files or high-frequency writes; (2) Atomicity required; (3) Clear before/after state differences describable as deltas.
Strategy 3: Compensation — A Semantically Equivalent Reverse Operation
Core idea: For operations with external side effects, write a dedicated compensation function that semantically offsets the original operation's effect.
When to use:
- External API calls: created Stripe subscription → cancel that subscription
- Message publishing: sent email → send correction email
- DNS changes: created A record → delete it
- Schema changes: added NOT NULL column → drop the column
Advantages: Covers scenarios snapshot and transaction can't reach — external side effects, irreversible state changes, business-semantic undo.
Costs: Compensation is not true rollback — the system does not return to the exact same state. The compensation function itself can fail. Reliability is bounded by external systems.
When to choose: (1) Irreversible external side effects; (2) Business-semantic undo required; (3) As a supplement after snapshot/transaction.
📌 Practical Rule: All agent write operations fall into three rollback categories — local file operations (snapshot first), database/state operations (transaction first), external API operations (compensation as fallback). When one operation spans multiple types, combine strategies. This is the core responsibility of RollbackOrchestrator.
Strategy Decision Matrix
The matrix below summarizes the standard decision path for choosing a strategy:
Operation Characteristics │ Recommended │ Fallback
──────────────────────────────────────────────┼───────────────┼─────────────
Pure local file write, file < 100MB │ SNAPSHOT │ TRANSACTION
Pure local file write, file > 100MB │ TRANSACTION │ SNAPSHOT
Multi-file atomic write (> 1 file) │ TRANSACTION │ per-file SNAPSHOT
Includes database write │ TRANSACTION │ COMPENSATION
Includes external API call │ COMPENSATION │ Irreversible (log+alert)
High-frequency write (>5/min/file) │ TRANSACTION │ SNAPSHOT+merge
Requires semantic-level undo (e.g., refund) │ COMPENSATION │ Irreversible
Target is irreversibly destructive (e.g., rm) │ Irreversible │ Audit log + human intervention
Cross-agent distributed write │ COMP/COORD │ Human intervention
RollbackOrchestrator: Strategy Selection and Execution Orchestration
The code below implements a complete rollback orchestrator — it analyzes the operation type before execution, automatically selects the snapshot/transaction/compensation strategy, and manages the entire undo handler registration and execution lifecycle:
from __future__ import annotations
import enum
import hashlib
import json
import os
import shutil
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Any, Protocol
from collections.abc import Sequence
import logging
logger = logging.getLogger("agent.rollback.orchestrator")
# ========================================================================
# Strategy enum — the three fundamental rollback approaches
# ========================================================================
class RollbackStrategy(enum.Enum):
"""
Three mutually-exclusive rollback strategies. Each maps to a different
recovery mechanism with distinct cost/speed/reliability tradeoffs.
SNAPSHOT — Pre-operation full-state capture. Fast restore, high
storage cost.
TRANSACTION — Write-ahead log + atomic commit. Low storage, complex
implementation.
COMPENSATION — Semantic reverse operation. For external side effects
that cannot be "undone" at the storage level.
"""
SNAPSHOT = "snapshot"
TRANSACTION = "transaction"
COMPENSATION = "compensation"
# ========================================================================
# Operation descriptor — what the Agent intends to do
# ========================================================================
class OperationKind(enum.Enum):
"""Categorization of what kind of mutation the Agent is performing."""
FILE_WRITE = "file_write" # Single file content replacement
FILE_MULTI = "file_multi" # Multiple file writes, one logical step
FILE_DELETE = "file_delete" # Removing a file entirely
DB_WRITE = "db_write" # Database INSERT/UPDATE/DELETE
CONFIG_CHANGE = "config_change" # Structured config modification
API_CALL = "api_call" # External API with side effects
MIXED = "mixed" # Combination of the above
@dataclass
class OperationDescriptor:
"""
Describes one atomic (from the Agent's perspective) operation.
The orchestrator uses this to select the rollback strategy.
"""
kind: OperationKind
target: str # Human-readable target (path, table, URL)
operation_id: str # Unique ID for tracing
# Optional hints for strategy selection
estimated_size_bytes: int = 0 # For SNAPSHOT cost estimation
is_reversible_api: bool = True # For COMPENSATION: API supports undo?
affected_files: list[str] = field(default_factory=list)
# Custom strategy override — if set, orchestrator skips auto-selection
preferred_strategy: RollbackStrategy | None = None
# ========================================================================
# Undo handler protocol — the universal rollback interface
# ========================================================================
class UndoHandler(Protocol):
"""
Protocol for any undo operation. Every strategy must produce one.
The orchestrator calls execute(), and the handler returns True if
the rollback succeeded. If False, the orchestrator may escalate.
"""
def execute(self) -> bool: ...
def describe(self) -> str: ...
@dataclass
class SimpleUndo:
"""A concrete undo handler wrapping a callable + description."""
_fn: Callable[[], bool]
_desc: str
def execute(self) -> bool:
try:
return self._fn()
except Exception as exc:
logger.error("Undo handler failed: %s — %s", self._desc, exc)
return False
def describe(self) -> str:
return self._desc
# ========================================================================
# Strategy selection logic — the decision matrix as code
# ========================================================================
@dataclass
class StrategyDecision:
"""
Output of the strategy selector. Contains the chosen strategy and
a reason string for auditability.
"""
strategy: RollbackStrategy
reason: str
# If True, orchestrator also registers a fallback undo handler
needs_fallback: bool = False
fallback_strategy: RollbackStrategy | None = None
def select_strategy(op: OperationDescriptor) -> StrategyDecision:
"""
Decision matrix that maps OperationDescriptor → RollbackStrategy.
Decision logic (in priority order):
1. If caller explicitly set preferred_strategy, use it.
2. FILE_WRITE: SNAPSHOT if file < 100MB, else TRANSACTION.
Rationale: snapshot cost grows linearly with file size; beyond
~100MB, the IO overhead of copying dominates and WAL is cheaper.
3. FILE_MULTI: always TRANSACTION.
Rationale: multiple files need atomicity — snapshot would require
per-file copies and the restore order matters.
4. FILE_DELETE: SNAPSHOT (copy before delete) if small, else
COMPENSATION (recreate from backup source).
5. DB_WRITE: TRANSACTION (WAL-based undo via pre-image logging).
6. API_CALL: COMPENSATION if reversible_api=True, else best-effort
COMPENSATION (escalate to human if irrecoverable).
7. CONFIG_CHANGE: TRANSACTION (structured configs have clear key/value
deltas, making WAL the natural fit).
8. MIXED: TRANSACTION as primary, COMPENSATION as fallback for
API-call sub-operations within the mix.
"""
if op.preferred_strategy is not None:
return StrategyDecision(
strategy=op.preferred_strategy,
reason=f"Explicit caller preference for {op.target}",
)
kind = op.kind
if kind == OperationKind.FILE_WRITE:
if op.estimated_size_bytes < 100 * 1024 * 1024: # < 100 MB
return StrategyDecision(
strategy=RollbackStrategy.SNAPSHOT,
reason=(
f"File {op.target} is small "
f"(~{op.estimated_size_bytes}B), SNAPSHOT is cheap"
),
)
else:
return StrategyDecision(
strategy=RollbackStrategy.TRANSACTION,
reason=(
f"File {op.target} is large "
f"(~{op.estimated_size_bytes}B), SNAPSHOT too expensive "
f"— using TRANSACTION"
),
needs_fallback=True,
fallback_strategy=RollbackStrategy.SNAPSHOT,
)
elif kind == OperationKind.FILE_MULTI:
return StrategyDecision(
strategy=RollbackStrategy.TRANSACTION,
reason=(
f"Multi-file operation ({len(op.affected_files)} files) "
f"requires atomicity — TRANSACTION"
),
needs_fallback=True,
fallback_strategy=RollbackStrategy.SNAPSHOT,
)
elif kind == OperationKind.FILE_DELETE:
if op.estimated_size_bytes < 100 * 1024 * 1024:
return StrategyDecision(
strategy=RollbackStrategy.SNAPSHOT,
reason=f"Delete of {op.target}: SNAPSHOT before deletion",
)
else:
return StrategyDecision(
strategy=RollbackStrategy.COMPENSATION,
reason=(
f"Delete of large file {op.target}: cannot snapshot — "
f"COMPENSATION (recreate from backup)"
),
)
elif kind == OperationKind.DB_WRITE:
return StrategyDecision(
strategy=RollbackStrategy.TRANSACTION,
reason="Database writes: TRANSACTION via pre-image WAL",
)
elif kind == OperationKind.API_CALL:
if op.is_reversible_api:
return StrategyDecision(
strategy=RollbackStrategy.COMPENSATION,
reason=f"API call to {op.target}: COMPENSATION (reversible)",
needs_fallback=True,
fallback_strategy=RollbackStrategy.COMPENSATION,
)
else:
return StrategyDecision(
strategy=RollbackStrategy.COMPENSATION,
reason=(
f"API call to {op.target}: NOT reversible — "
f"COMPENSATION (best-effort, partial recovery expected)"
),
needs_fallback=True,
fallback_strategy=None,
)
elif kind == OperationKind.CONFIG_CHANGE:
return StrategyDecision(
strategy=RollbackStrategy.TRANSACTION,
reason="Config changes: TRANSACTION via key-level delta logging",
)
elif kind == OperationKind.MIXED:
return StrategyDecision(
strategy=RollbackStrategy.TRANSACTION,
reason=(
"Mixed operation: TRANSACTION primary, "
"COMPENSATION fallback for API sub-operations"
),
needs_fallback=True,
fallback_strategy=RollbackStrategy.COMPENSATION,
)
# Fallthrough — should never happen but be defensive
return StrategyDecision(
strategy=RollbackStrategy.SNAPSHOT,
reason=f"Unknown operation kind {kind}: defaulting to SNAPSHOT",
)
# ========================================================================
# Snapshot strategy implementation
# ========================================================================
@dataclass
class SnapshotUndo:
"""Undo handler that restores a file from a saved snapshot."""
snapshot_path: Path
target_path: Path
def execute(self) -> bool:
try:
if not self.snapshot_path.exists():
logger.error(
"Snapshot missing: %s — cannot restore %s",
self.snapshot_path, self.target_path,
)
return False
shutil.copy2(self.snapshot_path, self.target_path)
self.snapshot_path.unlink(missing_ok=True)
logger.info(
"Snapshot rollback: restored %s → %s",
self.snapshot_path, self.target_path,
)
return True
except Exception as exc:
logger.error("Snapshot rollback failed: %s", exc)
return False
def describe(self) -> str:
return f"Snapshot restore: {self.snapshot_path} → {self.target_path}"
def prepare_snapshot_undo(
target: str,
snapshot_dir: str | Path,
operation_id: str,
) -> tuple[bool, Optional[SimpleUndo], Optional[str]]:
"""
Creates a snapshot of `target` and returns an undo handler.
Returns (success, undo_handler, error_message).
"""
target_path = Path(target).resolve()
snap_dir = Path(snapshot_dir)
snap_dir.mkdir(parents=True, exist_ok=True)
if not target_path.exists():
# Nothing to snapshot — undo means deleting any created file
def delete_if_created() -> bool:
target_path.unlink(missing_ok=True)
return True
return (
True,
SimpleUndo(delete_if_created, f"Delete {target_path} if created"),
None,
)
snapshot_path = snap_dir / f"snap-{operation_id}-{target_path.name}"
try:
shutil.copy2(target_path, snapshot_path)
handler = SnapshotUndo(
snapshot_path=snapshot_path, target_path=target_path,
)
return True, SimpleUndo(handler.execute, handler.describe()), None
except Exception as exc:
return False, None, str(exc)
# ========================================================================
# Transaction (WAL) strategy — simplified intent log
# ========================================================================
@dataclass
class WALEntry:
"""A single write-ahead log entry for file operations."""
operation_id: str
target: str
pre_image_hash: str # SHA-256 of file before write
pre_image_path: str # Path to saved pre-image
timestamp: float = field(default_factory=time.time)
class WriteAheadLog:
"""
Minimal write-ahead log for file operations.
Stores pre-images rather than full file copies to keep storage low.
In production, use a proper WAL with checksums, rotation, and crash
recovery. This is the pattern, not the production code.
"""
def __init__(self, wal_dir: str | Path) -> None:
self.wal_dir = Path(wal_dir)
self.wal_dir.mkdir(parents=True, exist_ok=True)
self._log_path = self.wal_dir / "wal.jsonl"
def record_intent(
self, operation_id: str, target: str,
) -> Optional[SimpleUndo]:
"""
Record "I intend to write to `target`".
Saves a pre-image and returns an undo handler.
Returns None if the pre-image capture fails.
"""
target_path = Path(target).resolve()
if not target_path.exists():
# New file: intent is "will create". Undo = delete.
entry = WALEntry(
operation_id=operation_id,
target=target,
pre_image_hash="new_file",
pre_image_path="",
)
self._append_entry(entry)
def undo_new_file() -> bool:
target_path.unlink(missing_ok=True)
self._append_entry(WALEntry(
operation_id=f"{operation_id}-undo",
target=target,
pre_image_hash="undo_delete",
pre_image_path="",
))
return True
return SimpleUndo(
undo_new_file, f"WAL undo: delete new file {target}",
)
# Existing file: save pre-image hash and content
try:
content = target_path.read_bytes()
file_hash = hashlib.sha256(content).hexdigest()
# Save pre-image (full content for simplicity; production
# code would compute and store a diff)
pre_image_path = (
self.wal_dir / f"pre-{operation_id}-{target_path.name}"
)
pre_image_path.write_bytes(content)
entry = WALEntry(
operation_id=operation_id,
target=target,
pre_image_hash=file_hash,
pre_image_path=str(pre_image_path),
)
self._append_entry(entry)
pre_img = pre_image_path # capture for closure
def undo_wal() -> bool:
try:
if pre_img.exists():
shutil.copy2(pre_img, target_path)
pre_img.unlink(missing_ok=True)
self._append_entry(WALEntry(
operation_id=f"{operation_id}-undo",
target=target,
pre_image_hash="undo_restore",
pre_image_path="",
))
logger.info(
"WAL rollback: restored %s from pre-image",
target,
)
return True
return False
except Exception as exc:
logger.error("WAL undo failed: %s", exc)
return False
return SimpleUndo(
undo_wal,
f"WAL undo: restore {target} from pre-image",
)
except Exception as exc:
logger.error(
"WAL intent recording failed for %s: %s", target, exc,
)
return None
def _append_entry(self, entry: WALEntry) -> None:
"""Append a JSON line to the WAL file."""
with open(self._log_path, "a") as f:
f.write(json.dumps({
"operation_id": entry.operation_id,
"target": entry.target,
"pre_image_hash": entry.pre_image_hash,
"pre_image_path": entry.pre_image_path,
"timestamp": entry.timestamp,
}) + "\n")
def get_entries(self, operation_id: str) -> list[dict]:
"""Retrieve all WAL entries for a given operation_id."""
if not self._log_path.exists():
return []
entries = []
with open(self._log_path) as f:
for line in f:
entry = json.loads(line.strip())
if entry.get("operation_id", "").startswith(operation_id):
entries.append(entry)
return entries
# ========================================================================
# RollbackOrchestrator — the central coordinator
# ========================================================================
@dataclass
class RollbackRecord:
"""Tracks one rollback registration for observability and debugging."""
operation_id: str
strategy: RollbackStrategy
target: str
handler: SimpleUndo
registered_at: float = field(default_factory=time.time)
executed: bool = False
@dataclass
class RollbackOrchestrator:
"""
Central coordinator for rollback across an Agent's entire operation.
Responsibilities:
1. Analyze each write operation → select strategy via select_strategy()
2. Prepare undo handler BEFORE the write executes
3. Register the handler in a stack (LIFO for composable rollback)
4. On failure, pop handlers from stack and execute in reverse order
5. Track rollback records for audit/observability
Usage:
orch = RollbackOrchestrator(
snapshot_dir="/var/agent/snapshots",
wal_dir="/var/agent/wal",
)
# Before Agent writes:
undo = orch.prepare_undo(op_descriptor)
if undo is None:
raise RuntimeError(
"Could not prepare rollback — aborting write"
)
# Agent performs the write...
# If the write succeeds:
orch.commit(operation_id)
# If anything fails:
orch.rollback_all() # LIFO undo execution
"""
snapshot_dir: Path
wal_dir: Path
_wal: WriteAheadLog | None = None
_undo_stack: list[tuple[str, SimpleUndo]] = field(default_factory=list)
_records: list[RollbackRecord] = field(default_factory=list)
_committed: set[str] = field(default_factory=set)
def __post_init__(self) -> None:
self.snapshot_dir = Path(self.snapshot_dir)
self.wal_dir = Path(self.wal_dir)
self.snapshot_dir.mkdir(parents=True, exist_ok=True)
self.wal_dir.mkdir(parents=True, exist_ok=True)
self._wal = WriteAheadLog(self.wal_dir)
@property
def wal(self) -> WriteAheadLog:
assert self._wal is not None
return self._wal
# -- Strategy selection + undo preparation -----------------------------
def prepare_undo(self, op: OperationDescriptor) -> Optional[SimpleUndo]:
"""
Analyze the operation, select the best strategy, and prepare the
undo handler. Returns None if preparation fails — the caller MUST
NOT proceed with the write in that case.
This is the single entry point for "register undo before write".
"""
decision = select_strategy(op)
logger.info(
"Preparing undo for %s (kind=%s, strategy=%s): %s",
op.operation_id, op.kind.value,
decision.strategy.value, decision.reason,
)
handler: Optional[SimpleUndo] = None
if decision.strategy == RollbackStrategy.SNAPSHOT:
ok, undo, err = prepare_snapshot_undo(
target=op.target,
snapshot_dir=self.snapshot_dir,
operation_id=op.operation_id,
)
if not ok:
logger.error("Snapshot preparation failed: %s", err)
# Try fallback if available
if decision.fallback_strategy == RollbackStrategy.TRANSACTION:
logger.info(
"Falling back to TRANSACTION for %s",
op.operation_id,
)
handler = self.wal.record_intent(
op.operation_id, op.target,
)
else:
handler = undo
elif decision.strategy == RollbackStrategy.TRANSACTION:
handler = self.wal.record_intent(op.operation_id, op.target)
if handler is None and decision.fallback_strategy is not None:
logger.info(
"TRANSACTION prep failed for %s, falling back to %s",
op.operation_id, decision.fallback_strategy.value,
)
ok, undo, err = prepare_snapshot_undo(
target=op.target,
snapshot_dir=self.snapshot_dir,
operation_id=op.operation_id,
)
if ok:
handler = undo
elif decision.strategy == RollbackStrategy.COMPENSATION:
# Compensation handlers are supplied by the caller because they
# require business semantics. The caller should call
# register_compensation() with the actual undo function after
# prepare_undo returns.
handler = None # Caller provides via register_compensation()
else:
logger.error("Unknown strategy: %s", decision.strategy)
return None
# Register even if handler is None — caller may provide one later
if handler is not None:
self._register(
op.operation_id, handler, decision.strategy, op.target,
)
return handler
def register_compensation(
self,
operation_id: str,
compensation_fn: Callable[[], bool],
description: str,
op: OperationDescriptor,
) -> None:
"""
Register a compensation (semantic undo) handler for an operation
that was previously analyzed as needing COMPENSATION strategy.
Call AFTER prepare_undo() returned None for an API_CALL operation.
"""
decision = select_strategy(op)
if decision.strategy != RollbackStrategy.COMPENSATION:
logger.warning(
"register_compensation called for op %s "
"but strategy is %s — ignoring",
operation_id, decision.strategy.value,
)
return
handler = SimpleUndo(compensation_fn, description)
self._register(
operation_id, handler, RollbackStrategy.COMPENSATION, op.target,
)
logger.info(
"Compensation registered for %s: %s", operation_id, description,
)
def _register(
self,
operation_id: str,
handler: SimpleUndo,
strategy: RollbackStrategy,
target: str,
) -> None:
"""Push undo handler onto the LIFO stack and record for audit."""
self._undo_stack.append((operation_id, handler))
self._records.append(RollbackRecord(
operation_id=operation_id,
strategy=strategy,
target=target,
handler=handler,
))
logger.debug(
"Undo registered: op=%s strategy=%s target=%s (depth=%d)",
operation_id, strategy.value, target, len(self._undo_stack),
)
# -- Commit / rollback lifecycle ---------------------------------------
def commit(self, operation_id: str) -> None:
"""
Mark an operation as committed. Its undo handler will NOT be
executed during rollback_all() — only uncommitted operations
are rolled back.
"""
self._committed.add(operation_id)
# Remove from stack top if present (common case)
if self._undo_stack and self._undo_stack[-1][0] == operation_id:
self._undo_stack.pop()
logger.debug("Commit: removed %s from undo stack", operation_id)
else:
logger.debug(
"Commit marker set for %s (handler not at stack top)",
operation_id,
)
def rollback_all(self) -> list[str]:
"""
Execute all uncommitted undo handlers in LIFO order.
Returns list of operation_ids that failed to roll back.
LIFO ordering is critical: if op3 depends on op2 which depends
on op1, the undo must be op3 → op2 → op1 to avoid leaving
inconsistent state.
"""
failed: list[str] = []
executed_count = 0
# Iterate in reverse (LIFO), skipping committed operations
for operation_id, handler in reversed(self._undo_stack):
if operation_id in self._committed:
continue
logger.info(
"Executing rollback for %s: %s",
operation_id, handler.describe(),
)
try:
success = handler.execute()
if success:
executed_count += 1
for rec in self._records:
if rec.operation_id == operation_id:
rec.executed = True
else:
failed.append(operation_id)
logger.error("Rollback FAILED for %s", operation_id)
except Exception as exc:
failed.append(operation_id)
logger.exception(
"Rollback exception for %s: %s", operation_id, exc,
)
self._undo_stack.clear()
logger.info(
"Rollback complete: %d executed, %d failed, %d skipped",
executed_count, len(failed), len(self._committed),
)
return failed
def rollback_one(self, operation_id: str) -> bool:
"""Roll back a single operation by ID."""
for i, (op_id, handler) in enumerate(self._undo_stack):
if op_id == operation_id:
logger.info(
"Rolling back single operation: %s", operation_id,
)
success = handler.execute()
self._undo_stack.pop(i)
for rec in self._records:
if rec.operation_id == operation_id:
rec.executed = success
return success
logger.warning("No undo handler found for %s", operation_id)
return False
# -- Observability -----------------------------------------------------
@property
def pending_undos(self) -> int:
"""Number of uncommitted undo handlers on the stack."""
return sum(
1 for op_id, _ in self._undo_stack
if op_id not in self._committed
)
def summary(self) -> dict[str, Any]:
"""Return a summary of the orchestrator's state for monitoring."""
return {
"total_registered": len(self._records),
"pending_undos": self.pending_undos,
"committed": len(self._committed),
"strategies_used": {
s.value: sum(
1 for r in self._records if r.strategy == s
)
for s in RollbackStrategy
},
"records": [
{
"operation_id": r.operation_id,
"strategy": r.strategy.value,
"target": r.target,
"executed": r.executed,
}
for r in self._records
],
}
# ========================================================================
# Usage example
# ========================================================================
if __name__ == "__main__":
# Setup
test_file = Path("/tmp/agent-rollback-orch-demo.txt")
test_file.write_text("original content v1\n")
orch = RollbackOrchestrator(
snapshot_dir="/tmp/agent-rollback-snaps",
wal_dir="/tmp/agent-rollback-wal",
)
# -- Example 1: Small file write → SNAPSHOT ---------------------------
op1 = OperationDescriptor(
kind=OperationKind.FILE_WRITE,
target=str(test_file),
operation_id="write-v2",
estimated_size_bytes=50,
)
undo1 = orch.prepare_undo(op1)
assert undo1 is not None, "Undo prep failed!"
# Simulate Agent writing the file
test_file.write_text("modified content v2\n")
# Agent decides the write was wrong — rollback!
orch.rollback_one("write-v2")
assert test_file.read_text() == "original content v1\n", (
"Rollback failed!"
)
print("Example 1 passed: snapshot rollback")
# -- Example 2: Multi-file → TRANSACTION ------------------------------
test_file2 = Path("/tmp/agent-rollback-orch-demo2.txt")
test_file2.write_text("file2 original\n")
op2 = OperationDescriptor(
kind=OperationKind.FILE_MULTI,
target="batch-update-configs",
operation_id="batch-v2",
affected_files=[str(test_file), str(test_file2)],
)
undo2 = orch.prepare_undo(op2)
assert undo2 is not None # WAL undo handler for the primary target
# Simulate writes
test_file.write_text("file1 v3\n")
test_file2.write_text("file2 v3\n")
orch.rollback_all()
print("Example 2 passed: transaction rollback")
# -- Example 3: API call → COMPENSATION -------------------------------
op3 = OperationDescriptor(
kind=OperationKind.API_CALL,
target="https://api.stripe.com/v1/subscriptions",
operation_id="create-sub",
is_reversible_api=True,
)
undo3 = orch.prepare_undo(op3)
# undo3 is None for compensation — caller must register
if undo3 is None:
fake_subscription_id = "sub_abc123"
def cancel_subscription() -> bool:
# In reality: stripe.Subscription.delete(fake_subscription_id)
logger.info(
"Compensation: cancelled subscription %s",
fake_subscription_id,
)
return True
orch.register_compensation(
operation_id="create-sub",
compensation_fn=cancel_subscription,
description=f"Cancel Stripe subscription {fake_subscription_id}",
op=op3,
)
# Simulate failure after API call → rollback
failed = orch.rollback_all()
print(f"Example 3 passed: compensation rollback (failed={failed})")
# Summary
import json as _json
print("\nOrchestrator summary:")
print(_json.dumps(orch.summary(), indent=2, default=str))
# Cleanup
test_file.unlink(missing_ok=True)
test_file2.unlink(missing_ok=True)
import shutil as _shutil
_shutil.rmtree("/tmp/agent-rollback-snaps", ignore_errors=True)
_shutil.rmtree("/tmp/agent-rollback-wal", ignore_errors=True)
Orchestrator Design Decisions
Several architectural decisions in the RollbackOrchestrator are worth expanding on:
- LIFO Rollback Order:
rollback_all()executes undo handlers in stack order (last-in-first-out). This is not optional — if the agent modified a file at step 3 that depends on a file written at step 2, rollback must execute from step 3 backward to step 1. Forward-order rollback would restore step 2's file first, leaving step 3's file referencing content that no longer exists. - Strategy Degradation and Fallback: When strategy preparation fails, the orchestrator doesn't immediately error — it tries the
fallback_strategy. For example, for a borderline-sized file (just over 100MB), snapshot preparation might fail due to insufficient disk space; the orchestrator automatically degrades to WAL transaction mode. This degradation chain is encoded inselect_strategy(): every strategy decision carries an optional fallback. - Deferred Compensation Registration: For API calls,
prepare_undo()returnsNone— because the compensation function needs the result of the API call (e.g., thesubscription_id) to be constructed. The orchestrator supports a two-phase flow:prepare_undo()confirms the strategy → agent executes the API call and captures the result →register_compensation()uses that result to construct the compensation function. This two-phase design is inherent to the compensation strategy — you cannot construct a "cancel subscription" function before knowing whatsubscription_idStripe returned. - Semantics of commit(): When an operation executes successfully and the agent confirms no rollback is needed, calling
commit()removes it from the rollback stack. Note that this is not the same as deleting the snapshot or WAL entries — that data is retained for auditing.commit()only affects the rollback stack — committed operations are skipped duringrollback_all(). See Agent Audit Log Design for audit retention policies.
For safety constraints that limit an agent's blast radius during execution, see Agent Command Execution Safety. For how runtime isolation expands the coverage scope of rollback (container-level snapshots), see Agent Runtime Isolation. For how release gates use rollback as a safety net, see Agent Release Gate Design.
3. File-Level Rollback — Versioned Snapshots and Diff-Based Restoration
The RollbackOrchestrator from Section 2 selects strategies and manages undo handler registration. But strategy selection is only half the problem — the actual mechanics of file-level rollback demand a dedicated subsystem. Agents modify files with three properties that make naive snapshot-then-restore insufficient: high write frequency (an agent might touch 20 files in a single task), inter-file dependencies (config A references path written in config B), and non-text content (binary files, symlinks, permissions, extended attributes). A file-level rollback system must handle all three without ballooning storage costs.
This section presents CopyOnWriteAgent — a file-level rollback implementation built on a copy-on-write pipeline: stage the new content in a temporary location → validate it against schema, syntax, and semantic rules → atomically rename it over the target file. The original file is never modified in-place. Instead of storing full snapshots for every write, the system stores a checksum baseline plus reverse diffs — making frequent writes storage-efficient while retaining full rollback capability. The staging directory is managed with configurable retention policies that align with audit log retention windows; see Agent Audit Log Design for how those windows are determined.
Why Copy-on-Write Instead of In-Place Mutation
In-place file mutation — opening the target file with "w" and writing bytes — has a fatal property for agent automation: if the agent process is killed mid-write, the target file contains a partial, corrupted write. There is no "before" state to restore because the original bytes were overwritten incrementally. POSIX rename() solves this: staging writes to a temp file, and os.replace() (atomic on POSIX) swaps the temp file in. The target is either the complete old file or the complete new file — never a partial write. This is the foundational invariant of CopyOnWriteAgent.
Diff-based rollback further reduces storage: instead of storing a full snapshot of the file before every write (which would cost O(n × filesize) storage), the system computes a reverse unified diff from the staged content back to the baseline. For a 10KB config file modified 50 times with single-line changes, 50 reverse diffs consume roughly 5KB — versus 500KB for 50 full snapshots. For binary files or files where diffs exceed the full-copy threshold, the system transparently falls back to full snapshots.
Staging Directory Management and Retention
The staging directory accumulates artifacts: snapshot files, reverse diffs, checksum baselines, and orphaned staging files from aborted operations. Without retention policies, a long-running agent session can exhaust disk space. CopyOnWriteAgent implements a three-tier retention model:
- Active retention (uncommitted writes): Snapshots and diffs for operations that have not yet been committed are kept indefinitely — these are the active rollback safety net. If the agent's task fails and triggers
rollback_all(), every uncommitted write must be reversible. - Committed retention (successful writes): After the agent calls
commit(), the snapshot is eligible for cleanup after a configurable TTL (default: 1 hour). This window exists so that if a later operation reveals that an earlier "successful" write was actually wrong, you can still roll it back within the window. - Audit retention (historical writes): Checksum records and operation metadata (file path, timestamp, hash) are kept for audit purposes even after snapshot cleanup. These records align with the audit retention policy — see Agent Audit Log Design.
For file-level observability — tracking which files an agent touched, when, and whether the writes were committed or rolled back — see Agent Observability. The CopyOnWriteAgent emits structured log events at every pipeline stage (stage, validate, commit, rollback) that feed into the observability pipeline.
CopyOnWriteAgent: Implementation
The code below implements the complete copy-on-write pipeline with versioned staging, diff-based rollback, checksum baselines, retention enforcement, and full write/commit/rollback lifecycle management:
from __future__ import annotations
import difflib
import hashlib
import json
import os
import shutil
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Any
from collections.abc import Iterable
import logging
logger = logging.getLogger("agent.rollback.cow")
# ========================================================================
# VersionedFileRecord — metadata for one versioned file operation
# ========================================================================
@dataclass
class VersionedFileRecord:
"""
Tracks one version of a file managed by CopyOnWriteAgent.
Each write operation produces one record. The record stores either
a full snapshot path or a reverse diff, depending on which is smaller.
"""
operation_id: str
target_path: Path
version_number: int
# Checksum of the file BEFORE this write (the baseline we can restore to)
pre_image_sha256: str
# Checksum of the file AFTER this write (for verification)
post_image_sha256: str
# Restoration method: either a full snapshot or a reverse unified diff
snapshot_path: Optional[Path] = None
reverse_diff_path: Optional[Path] = None
# Diff was from stored_diff (post → pre) applied as reverse
is_diff_based: bool = False
# Lifecycle tracking
committed: bool = False
rolled_back: bool = False
timestamp: float = field(default_factory=time.time)
# ========================================================================
# Retention policy configuration
# ========================================================================
@dataclass
class RetentionPolicy:
"""
Controls how long staging artifacts are kept.
All durations are in seconds. A value of 0 means "delete immediately."
A value of -1 means "keep indefinitely."
"""
committed_snapshot_ttl: int = 3600 # 1 hour
uncommitted_snapshot_ttl: int = -1 # Keep until committed/rolled back
max_versions_per_file: int = 50 # Cap on version count per file
max_staging_size_bytes: int = 500 * 1024 * 1024 # 500 MB total staging cap
audit_record_ttl: int = 7 * 24 * 3600 # 7 days for audit metadata
# ========================================================================
# CopyOnWriteAgent — the file-level rollback engine
# ========================================================================
class CopyOnWriteAgent:
"""
File-level rollback using copy-on-write semantics with diff-based storage.
Architecture:
write() → stage content, compute diff vs baseline, register undo
commit() → mark version as committed, schedule snapshot for cleanup
rollback() → restore target from snapshot or apply reverse diff
Design invariants:
1. The target file is NEVER mutated in-place. Content is always staged
to a temp file first, validated, then atomically renamed.
2. Every write() call registers an undo path before the atomic rename.
If registration fails, the write does not proceed.
3. Storage is minimized by preferring reverse unified diffs over full
snapshots. The system transparently chooses whichever is smaller.
4. Retention is enforced on every write and commit call. Staging
artifacts exceeding policy limits are cleaned up eagerly.
Usage:
cow = CopyOnWriteAgent(
staging_dir="/var/agent/cow-staging",
retention=RetentionPolicy(committed_snapshot_ttl=7200),
)
# Prepare a write
version = cow.write(
target="/etc/nginx/nginx.conf",
content=new_nginx_config,
operation_id="nginx-rate-limit-v2",
validator=lambda staged_path: nginx_validate(staged_path),
)
# If the overall task succeeds:
cow.commit(version.operation_id)
# If something goes wrong and we need to undo this specific write:
cow.rollback(version.operation_id)
# Or undo everything uncommitted (LIFO order):
cow.rollback_all()
"""
def __init__(
self,
staging_dir: str | Path,
retention: RetentionPolicy | None = None,
) -> None:
self.staging_dir = Path(staging_dir).resolve()
self.staging_dir.mkdir(parents=True, exist_ok=True)
self.retention = retention or RetentionPolicy()
# Per-file version counters
self._version_counters: dict[str, int] = {}
# All version records indexed by operation_id
self._versions: dict[str, VersionedFileRecord] = {}
# Undo stack (LIFO) — operation_ids in write order
self._undo_stack: list[str] = []
# Metadata store for audit trail
self._metadata_path = self.staging_dir / "cow-metadata.json"
self._load_metadata()
# ------------------------------------------------------------------
# write() — the main entry point for file mutation
# ------------------------------------------------------------------
def write(
self,
target: str | Path,
content: str | bytes,
*,
operation_id: str,
validator: Callable[[Path], bool] | None = None,
force_snapshot: bool = False,
) -> VersionedFileRecord:
"""
Stage new content for `target` and register an undo handler.
Pipeline:
1. Compute checksum of current target (pre-image baseline).
2. Stage new content to a temp file in the staging directory.
3. Compute reverse diff (staged → current) if diff is smaller
than a full snapshot; otherwise snapshot the current file.
4. Optionally validate the staged file.
5. Atomically rename staged file over target.
6. Register the undo handler (restore from snapshot or apply
reverse diff to reconstruct the pre-image).
Args:
target: Path to the file to modify.
content: New file content (str or bytes).
operation_id: Unique identifier for this write operation.
validator: Optional callable(Path) → bool. If it returns
False or raises, the write is aborted and any
snapshot is cleaned up.
force_snapshot: If True, always use full snapshot instead of
diff-based storage (useful for binary files).
Returns:
VersionedFileRecord describing the registered version.
Raises:
ValueError: If operation_id has already been used.
OSError: If staging or atomic rename fails.
RuntimeError: If validation fails.
"""
if operation_id in self._versions:
raise ValueError(
f"Operation ID {operation_id!r} already exists — "
f"each write must have a unique ID"
)
target_path = Path(target).resolve()
target_key = str(target_path)
# --- Step 1: Pre-image checksum ----------------------------------
pre_image_hash: str
pre_image_content: bytes | None = None
if target_path.exists() and target_path.is_file():
pre_image_content = target_path.read_bytes()
pre_image_hash = hashlib.sha256(pre_image_content).hexdigest()
else:
# File does not exist — pre-image is "empty"
pre_image_content = None
pre_image_hash = "new_file"
# --- Step 2: Stage new content -----------------------------------
staged = self._stage_file(target_path, content, operation_id)
# --- Step 3: Compute post-image checksum -------------------------
staged_content = staged.read_bytes()
post_image_hash = hashlib.sha256(staged_content).hexdigest()
# --- Step 4: Determine storage method (snapshot vs diff) ---------
snapshot_path: Optional[Path] = None
reverse_diff_path: Optional[Path] = None
is_diff_based = False
if pre_image_content is None:
# New file: no pre-image to snapshot. Undo = delete.
pass
elif force_snapshot:
snapshot_path = self._save_snapshot(
pre_image_content, target_path, operation_id,
)
else:
# Compute reverse diff (staged → pre-image) and compare sizes
pre_lines = pre_image_content.decode("utf-8", errors="replace").splitlines(keepends=True)
staged_lines = staged_content.decode("utf-8", errors="replace").splitlines(keepends=True)
# Reverse diff: from staged back to pre-image
diff_lines = list(
difflib.unified_diff(
staged_lines, pre_lines,
fromfile=str(staged),
tofile=str(target_path),
)
)
diff_bytes = "".join(diff_lines).encode("utf-8")
# Choose the smaller storage representation
if len(diff_bytes) < len(pre_image_content):
# Diff is smaller — store the reverse diff
reverse_diff_path = self._save_reverse_diff(
diff_bytes, target_path, operation_id,
)
is_diff_based = True
else:
# Full snapshot is smaller (or diff didn't help)
snapshot_path = self._save_snapshot(
pre_image_content, target_path, operation_id,
)
# --- Step 5: Validate staged content -----------------------------
if validator is not None:
try:
if not validator(staged):
self._cleanup_staging(staged, snapshot_path, reverse_diff_path)
raise RuntimeError(
f"Validation rejected staged content for {target_path} "
f"(op={operation_id})"
)
except RuntimeError:
raise
except Exception as exc:
self._cleanup_staging(staged, snapshot_path, reverse_diff_path)
raise RuntimeError(
f"Validator raised exception for {target_path}: {exc}"
) from exc
# --- Step 6: Atomic rename (stage → target) ----------------------
try:
os.replace(staged, target_path)
# staged is now the live target file; snapshot/rev_diff remain in staging
except OSError:
self._cleanup_staging(staged, snapshot_path, reverse_diff_path)
raise
# --- Step 7: Register version and undo handler -------------------
version_number = self._next_version(target_key)
record = VersionedFileRecord(
operation_id=operation_id,
target_path=target_path,
version_number=version_number,
pre_image_sha256=pre_image_hash,
post_image_hash=post_image_hash,
snapshot_path=snapshot_path,
reverse_diff_path=reverse_diff_path,
is_diff_based=is_diff_based,
)
self._versions[operation_id] = record
self._undo_stack.append(operation_id)
self._save_metadata()
logger.info(
"CopyOnWrite write: op=%s target=%s v%d method=%s "
"pre=%s post=%s",
operation_id, target_path, version_number,
"diff" if is_diff_based else "snapshot",
pre_image_hash[:12], post_image_hash[:12],
)
# Enforce retention after every write
self._enforce_retention()
return record
# ------------------------------------------------------------------
# commit() — mark a version as successful, eligible for cleanup
# ------------------------------------------------------------------
def commit(self, operation_id: str) -> bool:
"""
Mark a write operation as committed.
After commit:
- The undo handler is removed from the active undo stack.
- The snapshot/diff becomes eligible for cleanup after the
committed_snapshot_ttl retention window.
- The operation is NOT reverted by rollback_all().
Returns True if the operation was found and committed, False
if operation_id is unknown.
"""
record = self._versions.get(operation_id)
if record is None:
logger.warning(
"commit() called for unknown operation_id: %s", operation_id,
)
return False
if record.rolled_back:
logger.warning(
"commit() called for already-rolled-back operation: %s",
operation_id,
)
return False
record.committed = True
# Remove from undo stack
try:
self._undo_stack.remove(operation_id)
except ValueError:
pass
self._save_metadata()
logger.info(
"CopyOnWrite commit: op=%s target=%s v%d",
operation_id, record.target_path, record.version_number,
)
self._enforce_retention()
return True
# ------------------------------------------------------------------
# rollback() — undo a single write operation
# ------------------------------------------------------------------
def rollback(self, operation_id: str) -> bool:
"""
Undo a single write operation by restoring the pre-image.
Restoration logic:
- If the write created a new file: delete the target file.
- If a full snapshot was stored: copy snapshot back over target.
- If a reverse diff was stored: apply the reverse diff to the
current target content to reconstruct the pre-image.
Returns True on success, False if rollback fails.
"""
record = self._versions.get(operation_id)
if record is None:
logger.error(
"rollback() called for unknown operation_id: %s",
operation_id,
)
return False
if record.rolled_back:
logger.warning(
"rollback() called for already-rolled-back operation: %s",
operation_id,
)
return True # Idempotent — already rolled back
target = record.target_path
logger.info(
"CopyOnWrite rollback: op=%s target=%s method=%s",
operation_id, target,
"diff" if record.is_diff_based else "snapshot",
)
try:
if record.pre_image_sha256 == "new_file":
# The write created this file — undo means delete it
target.unlink(missing_ok=True)
logger.debug("Rollback: deleted new file %s", target)
elif record.snapshot_path is not None and record.snapshot_path.exists():
# Full snapshot restore
shutil.copy2(record.snapshot_path, target)
logger.debug(
"Rollback: restored %s from snapshot %s",
target, record.snapshot_path,
)
elif (
record.reverse_diff_path is not None
and record.reverse_diff_path.exists()
):
# Reverse diff restore: read current content, apply reverse diff
if not target.exists():
logger.error(
"Rollback: target %s missing, cannot apply reverse diff",
target,
)
return False
current_lines = (
target.read_text("utf-8")
.splitlines(keepends=True)
)
diff_text = record.reverse_diff_path.read_text("utf-8")
# The reverse diff transforms staged → pre-image.
# We apply it to the current (staged) content to get pre-image.
restored_lines = list(
self._apply_reverse_patch(current_lines, diff_text)
)
target.write_text("".join(restored_lines), encoding="utf-8")
logger.debug(
"Rollback: applied reverse diff to %s", target,
)
else:
logger.error(
"Rollback: no restoration data for %s (op=%s)",
target, operation_id,
)
return False
# Verify restoration by comparing checksums
if target.exists():
restored_hash = hashlib.sha256(
target.read_bytes()
).hexdigest()
if restored_hash != record.pre_image_sha256:
logger.error(
"Rollback checksum mismatch for %s: "
"expected %s, got %s",
target, record.pre_image_sha256[:12],
restored_hash[:12],
)
# Best-effort: the restore happened but doesn't match.
# Don't return False — the file is closer to pre-image
# than the bad write was.
record.rolled_back = True
# Remove from undo stack
try:
self._undo_stack.remove(operation_id)
except ValueError:
pass
# Clean up staging artifacts for this version
if record.snapshot_path is not None:
record.snapshot_path.unlink(missing_ok=True)
if record.reverse_diff_path is not None:
record.reverse_diff_path.unlink(missing_ok=True)
self._save_metadata()
return True
except Exception as exc:
logger.exception(
"Rollback failed for op=%s: %s", operation_id, exc,
)
return False
# ------------------------------------------------------------------
# rollback_all() — undo all uncommitted writes in LIFO order
# ------------------------------------------------------------------
def rollback_all(self) -> list[str]:
"""
Roll back all uncommitted write operations in LIFO (reverse) order.
Returns a list of operation_ids that failed to roll back.
"""
failed: list[str] = []
# Process in reverse order (LIFO — most recent write first)
for operation_id in reversed(list(self._undo_stack)):
record = self._versions.get(operation_id)
if record is not None and record.committed:
continue
success = self.rollback(operation_id)
if not success:
failed.append(operation_id)
logger.info(
"CopyOnWrite rollback_all: %d rolled back, %d failed",
len(self._undo_stack) - len(failed), len(failed),
)
return failed
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _stage_file(
self,
target: Path,
content: str | bytes,
operation_id: str,
) -> Path:
"""Write content to a staging file and return its path."""
staging_path = (
self.staging_dir / f".cow-stage-{operation_id}-{target.name}"
)
if isinstance(content, str):
staging_path.write_text(content, encoding="utf-8")
else:
staging_path.write_bytes(content)
logger.debug("Staged: %s (%d bytes)", staging_path, len(content))
return staging_path
def _save_snapshot(
self,
content: bytes,
target: Path,
operation_id: str,
) -> Path:
"""Save a pre-image snapshot to the staging directory."""
snap_path = (
self.staging_dir / f".cow-snap-{operation_id}-{target.name}"
)
snap_path.write_bytes(content)
logger.debug("Snapshot saved: %s (%d bytes)", snap_path, len(content))
return snap_path
def _save_reverse_diff(
self,
diff_bytes: bytes,
target: Path,
operation_id: str,
) -> Path:
"""Save a reverse unified diff to the staging directory."""
diff_path = (
self.staging_dir / f".cow-diff-{operation_id}-{target.name}.diff"
)
diff_path.write_bytes(diff_bytes)
logger.debug(
"Reverse diff saved: %s (%d bytes)", diff_path, len(diff_bytes),
)
return diff_path
def _next_version(self, target_key: str) -> int:
"""Increment and return the version counter for a file."""
current = self._version_counters.get(target_key, 0)
next_ver = current + 1
self._version_counters[target_key] = next_ver
return next_ver
@staticmethod
def _apply_reverse_patch(
current_lines: list[str],
diff_text: str,
) -> Iterable[str]:
"""
Apply a unified diff in reverse to reconstruct the pre-image.
This is a minimal patch applicator that handles the common cases
encountered in agent file modifications. Production code should
use the `patch` command or a library like `whatthepatch`.
"""
# Parse diff hunks and apply in reverse
# For production: use subprocess.run(["patch", "--reverse", ...])
# Here we implement a simplified reverse patcher for text files.
import re
hunk_header = re.compile(
r'^@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@'
)
result: list[str] = list(current_lines)
offset = 0 # cumulative offset from applying hunks
diff_lines = diff_text.splitlines(keepends=True)
i = 0
while i < len(diff_lines):
line = diff_lines[i]
m = hunk_header.match(line)
if not m:
i += 1
continue
# In a reverse patch, the "from" lines were the staged (post)
# content and the "to" lines were the pre-image. We're applying
# in reverse: remove the "+" lines (which were staged additions)
# and restore the "-" lines (which were pre-image content).
old_start = int(m.group(1)) - 1 # 0-indexed
old_count = int(m.group(2)) if m.group(2) else 1
new_start = int(m.group(3)) - 1
new_count = int(m.group(4)) if m.group(4) else 1
i += 1
hunk_lines: list[str] = []
while i < len(diff_lines) and not diff_lines[i].startswith("@@"):
if not diff_lines[i].startswith(("---", "+++", "Index:")):
hunk_lines.append(diff_lines[i])
i += 1
# Apply reverse: "+" in diff = added in forward → remove in reverse
# "-" in diff = removed in forward → restore in reverse
pos = old_start + offset
replaced: list[str] = []
for hl in hunk_lines:
if hl.startswith(" "):
# Context line — keep it
if pos < len(result):
replaced.append(result[pos])
else:
replaced.append(hl[1:]) # fallback
pos += 1
elif hl.startswith("-"):
# Removed in forward diff — restore in reverse
replaced.append(hl[1:])
elif hl.startswith("+"):
# Added in forward diff — skip in reverse (remove it)
if pos < len(result):
pos += 1 # skip this line
# Replace the hunk in the result
end_pos = pos
if old_count > 0:
result[old_start + offset:old_start + offset + old_count] = replaced
offset += len(replaced) - old_count
return result
@staticmethod
def _cleanup_staging(*paths: Optional[Path]) -> None:
"""Remove staging artifacts on error."""
for p in paths:
if p is not None:
p.unlink(missing_ok=True)
# ------------------------------------------------------------------
# Retention enforcement
# ------------------------------------------------------------------
def _enforce_retention(self) -> None:
"""
Enforce retention policies on staging artifacts.
Called after every write() and commit() to prevent unbounded
disk usage.
"""
now = time.time()
policy = self.retention
removed_count = 0
for op_id, record in list(self._versions.items()):
age = now - record.timestamp
# Skip uncommitted records (kept indefinitely by default)
if not record.committed and policy.uncommitted_snapshot_ttl >= 0:
if age > policy.uncommitted_snapshot_ttl:
self._purge_record(op_id)
removed_count += 1
continue
# Committed records: clean up snapshots after TTL
if record.committed and policy.committed_snapshot_ttl >= 0:
if age > policy.committed_snapshot_ttl:
# Remove snapshot/diff artifacts but keep the metadata
if record.snapshot_path is not None:
record.snapshot_path.unlink(missing_ok=True)
record.snapshot_path = None
if record.reverse_diff_path is not None:
record.reverse_diff_path.unlink(missing_ok=True)
record.reverse_diff_path = None
removed_count += 1
# Enforce per-file version cap
per_file: dict[str, list[str]] = {}
for op_id, record in self._versions.items():
key = str(record.target_path)
per_file.setdefault(key, []).append(op_id)
for key, op_ids in per_file.items():
if len(op_ids) > policy.max_versions_per_file:
# Keep the most recent versions; purge the oldest
sorted_ids = sorted(
op_ids,
key=lambda oid: self._versions[oid].timestamp,
reverse=True,
)
for old_id in sorted_ids[policy.max_versions_per_file:]:
self._purge_record(old_id)
removed_count += 1
# Enforce total staging size cap
total_size = self._compute_staging_size()
if total_size > policy.max_staging_size_bytes:
# Purge oldest committed records first
committed = [
(op_id, self._versions[op_id].timestamp)
for op_id in self._versions
if self._versions[op_id].committed
]
committed.sort(key=lambda x: x[1]) # oldest first
for op_id, _ in committed:
if self._compute_staging_size() <= policy.max_staging_size_bytes:
break
self._purge_record(op_id)
removed_count += 1
if removed_count > 0:
logger.info(
"Retention enforced: removed %d staging artifacts", removed_count,
)
def _purge_record(self, operation_id: str) -> None:
"""Remove all artifacts for a version record."""
record = self._versions.get(operation_id)
if record is None:
return
if record.snapshot_path is not None:
record.snapshot_path.unlink(missing_ok=True)
if record.reverse_diff_path is not None:
record.reverse_diff_path.unlink(missing_ok=True)
# Keep audit metadata if within audit_record_ttl
age = time.time() - record.timestamp
if age > self.retention.audit_record_ttl:
del self._versions[operation_id]
self._save_metadata()
def _compute_staging_size(self) -> int:
"""Compute total size of all staging artifacts."""
total = 0
for record in self._versions.values():
if record.snapshot_path is not None and record.snapshot_path.exists():
total += record.snapshot_path.stat().st_size
if record.reverse_diff_path is not None and record.reverse_diff_path.exists():
total += record.reverse_diff_path.stat().st_size
return total
# ------------------------------------------------------------------
# Metadata persistence (survives process restarts)
# ------------------------------------------------------------------
def _save_metadata(self) -> None:
"""Persist version records so rollback survives restarts."""
data = {
"version_counters": self._version_counters,
"versions": {
op_id: {
"operation_id": r.operation_id,
"target_path": str(r.target_path),
"version_number": r.version_number,
"pre_image_sha256": r.pre_image_sha256,
"post_image_hash": r.post_image_sha256,
"snapshot_path": str(r.snapshot_path) if r.snapshot_path else None,
"reverse_diff_path": str(r.reverse_diff_path) if r.reverse_diff_path else None,
"is_diff_based": r.is_diff_based,
"committed": r.committed,
"rolled_back": r.rolled_back,
"timestamp": r.timestamp,
}
for op_id, r in self._versions.items()
},
"undo_stack": self._undo_stack,
}
self._metadata_path.write_text(json.dumps(data, indent=2))
def _load_metadata(self) -> None:
"""Load persisted version records from a previous session."""
if not self._metadata_path.exists():
return
try:
data = json.loads(self._metadata_path.read_text())
self._version_counters = data.get("version_counters", {})
self._undo_stack = data.get("undo_stack", [])
for op_id, rdata in data.get("versions", {}).items():
record = VersionedFileRecord(
operation_id=rdata["operation_id"],
target_path=Path(rdata["target_path"]),
version_number=rdata["version_number"],
pre_image_sha256=rdata["pre_image_sha256"],
post_image_sha256=rdata["post_image_hash"],
snapshot_path=(
Path(rdata["snapshot_path"])
if rdata.get("snapshot_path") else None
),
reverse_diff_path=(
Path(rdata["reverse_diff_path"])
if rdata.get("reverse_diff_path") else None
),
is_diff_based=rdata.get("is_diff_based", False),
committed=rdata.get("committed", False),
rolled_back=rdata.get("rolled_back", False),
timestamp=rdata.get("timestamp", 0.0),
)
self._versions[op_id] = record
logger.info(
"Loaded %d version records from %s",
len(self._versions), self._metadata_path,
)
except Exception as exc:
logger.warning("Failed to load metadata: %s — starting fresh", exc)
# ------------------------------------------------------------------
# Query / observability
# ------------------------------------------------------------------
@property
def pending_count(self) -> int:
"""Number of uncommitted write operations on the undo stack."""
return sum(
1 for op_id in self._undo_stack
if not self._versions.get(op_id, VersionedFileRecord(
operation_id="", target_path=Path(), version_number=0,
pre_image_sha256="", post_image_sha256="",
)).committed
)
def summary(self) -> dict[str, Any]:
"""Return a summary of the CopyOnWriteAgent state for monitoring."""
return {
"total_versions": len(self._versions),
"pending_undos": self.pending_count,
"committed": sum(1 for r in self._versions.values() if r.committed),
"rolled_back": sum(1 for r in self._versions.values() if r.rolled_back),
"diff_based": sum(1 for r in self._versions.values() if r.is_diff_based),
"snapshot_based": sum(
1 for r in self._versions.values()
if not r.is_diff_based and r.pre_image_sha256 != "new_file"
),
"staging_size_bytes": self._compute_staging_size(),
"undo_stack_depth": len(self._undo_stack),
}
# ========================================================================
# Usage example
# ========================================================================
if __name__ == "__main__":
import tempfile as _tempfile
# Setup: create a test file and a CopyOnWriteAgent
with _tempfile.TemporaryDirectory() as tmpdir:
staging = Path(tmpdir) / "cow-staging"
test_file = Path(tmpdir) / "app.conf"
test_file.write_text("db_host = localhost\ndb_port = 5432\n")
cow = CopyOnWriteAgent(staging_dir=staging)
# --- Write 1: modify config ---
v1 = cow.write(
target=test_file,
content="db_host = db.internal\ndb_port = 5432\nmax_conn = 100\n",
operation_id="update-db-host",
validator=lambda p: "db_host" in p.read_text(),
)
print(f"Write 1: v{v1.version_number} method={'diff' if v1.is_diff_based else 'snapshot'}")
print(f" Content: {test_file.read_text()!r}")
# --- Write 2: another modification ---
v2 = cow.write(
target=test_file,
content="db_host = db.internal\ndb_port = 5433\nmax_conn = 200\n",
operation_id="update-db-port",
)
print(f"Write 2: v{v2.version_number} method={'diff' if v2.is_diff_based else 'snapshot'}")
print(f" Content: {test_file.read_text()!r}")
# --- Rollback write 2 ---
success = cow.rollback("update-db-port")
print(f"Rollback write 2: {'OK' if success else 'FAIL'}")
print(f" Content after rollback: {test_file.read_text()!r}")
# --- Commit write 1 ---
cow.commit("update-db-host")
print(f"Commit write 1: pending={cow.pending_count}")
# --- Write 3 then rollback_all ---
cow.write(
target=test_file,
content="bad content\n",
operation_id="bad-write",
)
failed = cow.rollback_all()
print(f"Rollback all: failed={failed}")
print(f" Content after rollback_all: {test_file.read_text()!r}")
# Summary
import json as _json
print(f"\nSummary:\n{_json.dumps(cow.summary(), indent=2)}")
The CopyOnWriteAgent class integrates with the agent tool system at two points. First, each agent tool that performs file writes (e.g., Edit, Write, Bash with redirection) calls cow.write() instead of opening files directly. Second, the agent's task lifecycle hooks call cow.commit() on task success and cow.rollback_all() on task failure. This two-point integration ensures that every file touched by the agent is covered by the copy-on-write pipeline without requiring each tool to implement its own rollback logic.
The diff-based storage strategy introduces a subtle trade-off: applying reverse diffs during rollback is computationally more expensive than copying a snapshot (O(lines) patch application vs O(file size) byte copy). For files under ~10MB, this is negligible. For larger files, CopyOnWriteAgent naturally prefers full snapshots when the diff exceeds the snapshot size — a self-tuning behavior that avoids pathological cases where a 50MB binary file produces a 51MB diff. For runtime environments that support filesystem-level copy-on-write (e.g., btrfs snapshots, ZFS clones), the agent can delegate to the filesystem for near-zero-cost snapshots; see Agent Runtime Isolation for filesystem isolation strategies.
4. Data-Level Rollback — Transactional Wrappers and State Checkpoints
File-level rollback covers the most common agent write surface — but agents increasingly operate on structured data: database records, key-value stores, configuration databases like etcd, and in-memory state. A file snapshot cannot roll back a database row that was INSERTed into a PostgreSQL table; a reverse diff cannot undo a DNS record published to Route 53. Data-level rollback requires a different set of primitives: write-ahead logging with a two-phase intent/commit protocol, and state checkpoints that serialize the agent's full decision context at critical boundaries.
This section presents two components that together form the data-level rollback subsystem: WriteAheadLog — a two-phase commit log that records intents before mutations and commits after success, enabling crash-consistent rollback — and CheckpointManager — which serializes the agent's full state (open operations, undo stack, decision tree, environment) at configurable decision boundaries so that recovery can restart from the most recent checkpoint rather than from zero.
WriteAheadLog: Two-Phase Intent/Commit Protocol
The WriteAheadLog in this section differs from the WriteAheadLog in Section 2 in one critical dimension: it implements a full two-phase protocol with explicit INTENT and COMMIT markers. Section 2's WAL recorded pre-images on demand. The WAL here enforces a strict ordering: no data mutation occurs without a persisted INTENT record; no INTENT is considered durable without a corresponding COMMIT; and on crash recovery, any INTENT without a COMMIT is treated as uncommitted and can be safely discarded or reversed.
This protocol is especially important for database operations where partial writes leave the database in an inconsistent state. If the agent process crashes after executing UPDATE users SET role = 'admin' WHERE id = 42 but before logging the change, the database has been mutated with no record of what the previous value was. The two-phase protocol eliminates this gap: the INTENT record (containing the pre-image) is flushed to disk before the UPDATE executes; the COMMIT record is flushed after the UPDATE succeeds. In crash recovery, the system replays the log: committed entries are skipped, uncommitted entries are reversed using their stored pre-images.
CheckpointManager: Serialized Agent State at Decision Boundaries
Even with a write-ahead log, recovering from a crash by replaying every entry since the agent started is impractical — an agent might generate thousands of log entries per hour. CheckpointManager solves this by serializing the agent's full state at decision boundaries: points where the agent has completed one logical step and is about to begin the next. A checkpoint captures:
- Operations state: All registered undo handlers, the undo stack order, committed vs. uncommitted status for every operation the agent has touched. This allows
rollback_all()to resume correctly after a crash. - Environment snapshot: Current working directory, environment variables that the agent has modified, open file handles and their positions. Not a full process snapshot — just the state the agent needs to reason about its own context.
- Decision tree: Which branches the agent took in its task graph. The serialized decision tree enables the recovery orchestrator to determine "the agent was at step 4 of 7 when it crashed — resume from step 4, not from step 1."
- WAL position marker: The log sequence number (LSN) of the last committed WAL entry at checkpoint time. On recovery, only entries after this LSN need to be replayed.
Checkpoints are taken at three trigger points: (1) pre-decision — before the agent's LLM call produces a write decision, capturing the "known-good" state; (2) post-commit — after a batch of writes has been committed, establishing a new baseline; (3) on-signal — when the agent receives an external signal (SIGTERM, shutdown request, memory pressure warning). The combination of pre-decision and post-commit checkpoints creates a "recovery bracket" — if a crash occurs mid-write, the system falls back to the pre-decision checkpoint and replays the WAL from that point.
Checkpoint Restore and Journal Replay
Recovery from a checkpoint follows a three-phase sequence: load the serialized state → replay the WAL journal from the checkpoint's LSN marker → reconstruct the agent's decision context. The replay phase is the critical path: for each WAL entry between the checkpoint LSN and the crash point, the recovery system identifies committed entries (which represent durable mutations — skip them) and uncommitted entries (which represent in-flight or failed mutations — reverse them using pre-images or execute their undo handlers).
This checkpoint-plus-journal model is adapted from database recovery (ARIES protocol) but specialized for agent workloads. Unlike a database, an agent's "transactions" can span minutes, involve external API calls, and include LLM inference steps. The checkpoint frequency must balance recovery speed against checkpoint overhead: too frequent → excessive serialization I/O; too infrequent → long WAL replay on recovery. The default frequency of "every N committed operations" (N=10) is a reasonable starting point for most agent workloads.
For observability into checkpoint health — how large checkpoints are, how frequently they're taken, and whether replay succeeds — see Agent Observability. The CheckpointManager exposes metrics that feed directly into the agent's observability dashboards. For audit compliance, every checkpoint and its corresponding WAL segment must be retained for the same duration as the agent's audit log; see Agent Audit Log Design for retention policy alignment.
WriteAheadLog and CheckpointManager: Implementation
The code below implements both the two-phase WriteAheadLog and the CheckpointManager, with full intent/commit protocol, crash recovery support, checkpoint serialization, and journal replay:
from __future__ import annotations
import enum
import hashlib
import json
import os
import pickle
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Any
import logging
logger = logging.getLogger("agent.rollback.data")
# ========================================================================
# WriteAheadLog — two-phase intent/commit protocol for data mutations
# ========================================================================
class WALEntryKind(enum.Enum):
"""Types of write-ahead log entries."""
INTENT = "intent" # "I plan to mutate this data"
COMMIT = "commit" # "The mutation succeeded"
ABORT = "abort" # "The mutation was explicitly cancelled"
CHECKPOINT = "checkpoint" # Marker: checkpoint was taken here
@dataclass
class WALEntry:
"""
A single entry in the write-ahead log.
Each entry has a monotonically increasing log sequence number (LSN)
for ordering and crash recovery. The LSN is the byte offset of the
entry in the WAL file, ensuring crash-consistent ordering.
"""
lsn: int # Log sequence number (byte offset)
kind: WALEntryKind
operation_id: str
target: str # Human-readable target (table, key, URL)
# For INTENT entries: serialized pre-image data
pre_image: Optional[dict[str, Any]] = None
pre_image_checksum: str = "" # SHA-256 of serialized pre_image
# For COMMIT/ABORT: reference to the INTENT LSN
intent_lsn: int = 0
# Metadata
timestamp: float = field(default_factory=time.time)
# Post-commit verification
post_image_checksum: str = ""
class WriteAheadLog:
"""
Two-phase write-ahead log for data-level agent operations.
Protocol:
Phase 1 — INTENT:
1. Agent declares what it intends to mutate (target + operation_id).
2. System captures pre-image (current state of the target data).
3. INTENT entry is written to WAL and fsync'd to disk.
4. Only after fsync does the agent proceed to mutate the data.
Phase 2 — COMMIT:
5. Agent executes the mutation.
6. System captures post-image checksum for verification.
7. COMMIT entry is written to WAL and fsync'd.
Crash recovery:
- On startup, scan the WAL from the last checkpoint.
- Any INTENT without a matching COMMIT or ABORT is "in-doubt".
- In-doubt INTENTs are reversed using their stored pre-images.
- COMMITted entries represent durable state and are skipped.
Design invariants:
- No data mutation without a persisted INTENT record.
- WAL entries use monotonic LSNs (byte offsets) for ordering.
- Every write to the WAL file is followed by fsync — the WAL
is the source of truth for crash recovery.
"""
def __init__(self, wal_dir: str | Path) -> None:
self.wal_dir = Path(wal_dir)
self.wal_dir.mkdir(parents=True, exist_ok=True)
self._wal_path = self.wal_dir / "wal.dat"
self._index_path = self.wal_dir / "wal-index.json"
# In-memory index: operation_id → latest WAL entry
self._index: dict[str, WALEntry] = {}
# Next LSN: current file size (append-only semantics)
self._next_lsn = (
self._wal_path.stat().st_size if self._wal_path.exists() else 0
)
# Load existing index on startup
self._load_index()
# ------------------------------------------------------------------
# Phase 1: record_intent — declare what you plan to mutate
# ------------------------------------------------------------------
def record_intent(
self,
operation_id: str,
target: str,
pre_image: dict[str, Any],
) -> Optional[int]:
"""
Record an INTENT to mutate `target`.
The pre_image must contain enough information to reverse the
mutation. For a database row: {"table": "users", "pk": 42,
"old_values": {"role": "viewer"}}. For a KV store:
{"key": "/config/timeout", "old_value": "30"}.
Returns the LSN of the INTENT entry, or None if recording fails.
After this call returns successfully, the caller may proceed
with the mutation. If the process crashes before commit(),
crash recovery will reverse this intent.
"""
if operation_id in self._index:
existing = self._index[operation_id]
if existing.kind == WALEntryKind.INTENT:
logger.warning(
"INTENT already recorded for %s at LSN %d — "
"reusing existing intent",
operation_id, existing.lsn,
)
return existing.lsn
# Serialize and checksum the pre-image
pre_image_json = json.dumps(pre_image, sort_keys=True)
pre_image_checksum = hashlib.sha256(
pre_image_json.encode("utf-8")
).hexdigest()
entry = WALEntry(
lsn=self._next_lsn,
kind=WALEntryKind.INTENT,
operation_id=operation_id,
target=target,
pre_image=pre_image,
pre_image_checksum=pre_image_checksum,
)
try:
lsn = self._append_entry(entry)
self._index[operation_id] = entry
logger.info(
"WAL INTENT: op=%s target=%s lsn=%d checksum=%s",
operation_id, target, lsn, pre_image_checksum[:12],
)
return lsn
except OSError as exc:
logger.error(
"WAL INTENT recording failed for %s: %s", operation_id, exc,
)
return None
# ------------------------------------------------------------------
# Phase 2: commit / abort — finalize the mutation
# ------------------------------------------------------------------
def commit(
self,
operation_id: str,
post_image: Optional[dict[str, Any]] = None,
) -> bool:
"""
Mark an INTENT as committed — the mutation succeeded.
Args:
operation_id: Must match a previously recorded INTENT.
post_image: Optional post-mutation state for verification.
Returns True if the COMMIT was recorded, False on failure.
"""
intent_entry = self._index.get(operation_id)
if intent_entry is None:
logger.error(
"commit() called for unknown operation_id: %s", operation_id,
)
return False
if intent_entry.kind != WALEntryKind.INTENT:
logger.warning(
"commit() called but entry is %s (not INTENT) for %s",
intent_entry.kind.value, operation_id,
)
return False
post_checksum = ""
if post_image is not None:
post_checksum = hashlib.sha256(
json.dumps(post_image, sort_keys=True).encode("utf-8")
).hexdigest()
entry = WALEntry(
lsn=self._next_lsn,
kind=WALEntryKind.COMMIT,
operation_id=operation_id,
target=intent_entry.target,
intent_lsn=intent_entry.lsn,
post_image_checksum=post_checksum,
)
try:
lsn = self._append_entry(entry)
# Update index: replace INTENT with COMMIT
self._index[operation_id] = entry
logger.info(
"WAL COMMIT: op=%s target=%s lsn=%d intent_lsn=%d",
operation_id, intent_entry.target, lsn, intent_entry.lsn,
)
return True
except OSError as exc:
logger.error(
"WAL COMMIT recording failed for %s: %s", operation_id, exc,
)
return False
def abort(self, operation_id: str) -> bool:
"""
Mark an INTENT as aborted — the mutation was cancelled.
Aborted intents are NOT reversed during crash recovery
(the mutation never happened).
"""
intent_entry = self._index.get(operation_id)
if intent_entry is None or intent_entry.kind != WALEntryKind.INTENT:
logger.warning("abort() called with no INTENT for %s", operation_id)
return False
entry = WALEntry(
lsn=self._next_lsn,
kind=WALEntryKind.ABORT,
operation_id=operation_id,
target=intent_entry.target,
intent_lsn=intent_entry.lsn,
)
try:
self._append_entry(entry)
self._index[operation_id] = entry
logger.info("WAL ABORT: op=%s lsn=%d", operation_id, entry.lsn)
return True
except OSError as exc:
logger.error("WAL ABORT recording failed: %s", exc)
return False
# ------------------------------------------------------------------
# Crash recovery — find and reverse in-doubt transactions
# ------------------------------------------------------------------
def recover(self, from_lsn: int = 0) -> list[dict[str, Any]]:
"""
Scan the WAL from `from_lsn` and identify in-doubt transactions.
An in-doubt transaction has an INTENT entry but no matching
COMMIT or ABORT entry. These must be reversed during recovery.
Returns a list of in-doubt pre-images that need to be restored,
ordered by LSN (oldest first, for correct dependency ordering).
"""
if not self._wal_path.exists():
return []
in_doubt: list[dict[str, Any]] = []
intents: dict[str, WALEntry] = {}
with open(self._wal_path, "rb") as f:
f.seek(from_lsn)
while True:
entry, _ = self._read_entry_at(f.tell(), f)
if entry is None:
break
if entry.kind == WALEntryKind.INTENT:
intents[entry.operation_id] = entry
elif entry.kind in (WALEntryKind.COMMIT, WALEntryKind.ABORT):
intents.pop(entry.operation_id, None)
# CHECKPOINT entries are informational — skip
# Remaining intents are in-doubt
for op_id in sorted(intents.keys()):
entry = intents[op_id]
if entry.pre_image is not None:
in_doubt.append({
"operation_id": op_id,
"target": entry.target,
"lsn": entry.lsn,
"pre_image": entry.pre_image,
"pre_image_checksum": entry.pre_image_checksum,
"timestamp": entry.timestamp,
})
logger.warning(
"In-doubt transaction: op=%s target=%s lsn=%d",
op_id, entry.target, entry.lsn,
)
logger.info(
"WAL recovery: %d in-doubt transactions found (from LSN %d)",
len(in_doubt), from_lsn,
)
return in_doubt
# ------------------------------------------------------------------
# Internal WAL I/O
# ------------------------------------------------------------------
def _append_entry(self, entry: WALEntry) -> int:
"""
Append a serialized entry to the WAL file, fsync, and return
the LSN (byte offset) where the entry was written.
"""
data = json.dumps({
"lsn": entry.lsn,
"kind": entry.kind.value,
"operation_id": entry.operation_id,
"target": entry.target,
"pre_image": entry.pre_image,
"pre_image_checksum": entry.pre_image_checksum,
"intent_lsn": entry.intent_lsn,
"timestamp": entry.timestamp,
"post_image_checksum": entry.post_image_checksum,
}).encode("utf-8")
# Prepend length prefix for framed reading
length_prefix = len(data).to_bytes(4, byteorder="big")
with open(self._wal_path, "ab") as f:
offset = f.tell()
f.write(length_prefix)
f.write(data)
f.flush()
os.fsync(f.fileno())
self._next_lsn = offset + 4 + len(data)
self._save_index()
return offset
def _read_entry_at(
self, offset: int, f=None,
) -> tuple[Optional[WALEntry], int]:
"""
Read a single WAL entry starting at `offset`.
Returns (entry, next_offset) or (None, offset) on EOF/error.
"""
close_f = False
if f is None:
f = open(self._wal_path, "rb")
close_f = True
try:
f.seek(offset)
length_bytes = f.read(4)
if len(length_bytes) < 4:
return None, offset
data_length = int.from_bytes(length_bytes, byteorder="big")
data_bytes = f.read(data_length)
if len(data_bytes) < data_length:
return None, offset
data = json.loads(data_bytes.decode("utf-8"))
entry = WALEntry(
lsn=data["lsn"],
kind=WALEntryKind(data["kind"]),
operation_id=data["operation_id"],
target=data["target"],
pre_image=data.get("pre_image"),
pre_image_checksum=data.get("pre_image_checksum", ""),
intent_lsn=data.get("intent_lsn", 0),
timestamp=data.get("timestamp", 0.0),
post_image_checksum=data.get("post_image_checksum", ""),
)
next_offset = offset + 4 + data_length
return entry, next_offset
except Exception as exc:
logger.error("WAL read error at offset %d: %s", offset, exc)
return None, offset
finally:
if close_f:
f.close()
def _save_index(self) -> None:
"""Persist the in-memory index for fast startup."""
index_data = {
op_id: {
"lsn": e.lsn,
"kind": e.kind.value,
"intent_lsn": e.intent_lsn,
"timestamp": e.timestamp,
}
for op_id, e in self._index.items()
}
self._index_path.write_text(json.dumps(index_data, indent=2))
def _load_index(self) -> None:
"""Load the WAL index from disk on startup."""
if not self._index_path.exists():
return
try:
index_data = json.loads(self._index_path.read_text())
# Rebuild lightweight index (no pre_images loaded into memory;
# those are read on-demand from the WAL file via recover())
for op_id, data in index_data.items():
self._index[op_id] = WALEntry(
lsn=data["lsn"],
kind=WALEntryKind(data["kind"]),
operation_id=op_id,
target="", # Full data is in WAL file
intent_lsn=data.get("intent_lsn", 0),
timestamp=data.get("timestamp", 0.0),
)
logger.info(
"WAL index loaded: %d entries", len(self._index),
)
except Exception as exc:
logger.warning("WAL index load failed: %s — rebuilding", exc)
self._rebuild_index()
def _rebuild_index(self) -> None:
"""Full scan of WAL file to rebuild the index."""
self._index.clear()
if not self._wal_path.exists():
return
offset = 0
with open(self._wal_path, "rb") as f:
while True:
entry, next_offset = self._read_entry_at(offset, f)
if entry is None:
break
self._index[entry.operation_id] = entry
offset = next_offset
logger.info(
"WAL index rebuilt: %d entries from full scan", len(self._index),
)
self._save_index()
def get_checkpoint_lsn(self) -> int:
"""Return the LSN of the most recent CHECKPOINT marker, or 0."""
latest = 0
for entry in self._index.values():
if entry.kind == WALEntryKind.CHECKPOINT:
latest = max(latest, entry.lsn)
return latest
def record_checkpoint(self, checkpoint_id: str) -> int:
"""Record a CHECKPOINT marker in the WAL."""
entry = WALEntry(
lsn=self._next_lsn,
kind=WALEntryKind.CHECKPOINT,
operation_id=checkpoint_id,
target=f"checkpoint:{checkpoint_id}",
)
lsn = self._append_entry(entry)
self._index[checkpoint_id] = entry
logger.info("WAL CHECKPOINT: id=%s lsn=%d", checkpoint_id, lsn)
return lsn
# ========================================================================
# CheckpointManager — serialize agent state at decision boundaries
# ========================================================================
@dataclass
class AgentStateSnapshot:
"""
Serialized representation of the agent's state at a decision boundary.
This is what gets written to disk as a checkpoint. It captures
enough information to resume the agent from this exact point
after a crash.
"""
checkpoint_id: str
timestamp: float
# Task progress
task_id: str
current_step: int
total_steps: int
# Decision tree (simplified: list of (decision_point, chosen_branch))
decision_path: list[dict[str, Any]]
# Registered undo handlers (operation_id → strategy + target description)
undo_registry: dict[str, dict[str, Any]]
# Undo stack order (list of operation_ids in LIFO order)
undo_stack: list[str]
# Committed operations (won't be rolled back)
committed_operations: list[str]
# Environment snapshot
working_directory: str
environment_vars: dict[str, str]
# WAL position at checkpoint time
wal_checkpoint_lsn: int
# Agent context window summary (for LLM context reconstruction)
context_summary: str
# Custom agent-specific state
custom_state: dict[str, Any] = field(default_factory=dict)
class CheckpointManager:
"""
Manages agent state checkpoints for crash recovery.
Checkpoints are serialized snapshots of the agent's full state
taken at decision boundaries. In combination with the WAL,
checkpoints enable fast recovery: load the most recent checkpoint,
replay WAL entries from the checkpoint LSN forward.
Trigger points:
- pre_decision: Before the agent makes a write decision.
- post_commit: After a batch of writes is committed.
- on_signal: On external signal (SIGTERM, shutdown request).
Storage model:
- Checkpoints are stored as files named ckpt-{id}.pkl
- A manifest file tracks all checkpoints with metadata
- Old checkpoints are pruned based on retention policy
"""
def __init__(
self,
checkpoint_dir: str | Path,
wal: WriteAheadLog,
*,
max_checkpoints: int = 20,
checkpoint_interval_operations: int = 10,
) -> None:
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
self.wal = wal
self.max_checkpoints = max_checkpoints
self.checkpoint_interval = checkpoint_operations
# Counter for automatic checkpoint triggering
self._operations_since_checkpoint = 0
# Manifest: maps checkpoint_id → metadata
self._manifest_path = self.checkpoint_dir / "manifest.json"
self._manifest: dict[str, dict[str, Any]] = {}
self._load_manifest()
# ------------------------------------------------------------------
# create_checkpoint — serialize current agent state
# ------------------------------------------------------------------
def create_checkpoint(
self,
task_id: str,
current_step: int,
total_steps: int,
decision_path: list[dict[str, Any]],
undo_registry: dict[str, dict[str, Any]],
undo_stack: list[str],
committed_operations: list[str],
*,
context_summary: str = "",
custom_state: dict[str, Any] | None = None,
) -> Optional[str]:
"""
Create a checkpoint of the agent's current state.
This is a synchronous operation — the caller should ensure
that no writes are in flight when creating a checkpoint.
Returns the checkpoint_id on success, None on failure.
"""
checkpoint_id = f"ckpt-{task_id}-step{current_step}-{int(time.time())}"
# Record a CHECKPOINT marker in the WAL first
wal_lsn = self.wal.record_checkpoint(checkpoint_id)
# Build the snapshot
snapshot = AgentStateSnapshot(
checkpoint_id=checkpoint_id,
timestamp=time.time(),
task_id=task_id,
current_step=current_step,
total_steps=total_steps,
decision_path=decision_path,
undo_registry=undo_registry,
undo_stack=list(undo_stack),
committed_operations=list(committed_operations),
working_directory=os.getcwd(),
environment_vars=self._capture_relevant_env(),
wal_checkpoint_lsn=wal_lsn,
context_summary=context_summary,
custom_state=custom_state or {},
)
# Serialize and write to disk
checkpoint_path = self.checkpoint_dir / f"{checkpoint_id}.pkl"
try:
with open(checkpoint_path, "wb") as f:
pickle.dump(snapshot, f, protocol=pickle.HIGHEST_PROTOCOL)
os.fsync(f.fileno())
# Update manifest
self._manifest[checkpoint_id] = {
"checkpoint_id": checkpoint_id,
"timestamp": snapshot.timestamp,
"task_id": task_id,
"current_step": current_step,
"total_steps": total_steps,
"wal_lsn": wal_lsn,
"file_path": str(checkpoint_path),
"size_bytes": checkpoint_path.stat().st_size,
}
self._save_manifest()
self._operations_since_checkpoint = 0
logger.info(
"Checkpoint created: %s (step %d/%d, WAL LSN=%d, %d bytes)",
checkpoint_id, current_step, total_steps,
wal_lsn, checkpoint_path.stat().st_size,
)
# Prune old checkpoints
self._prune_old_checkpoints()
return checkpoint_id
except Exception as exc:
logger.exception(
"Failed to create checkpoint %s: %s", checkpoint_id, exc,
)
# Clean up partial checkpoint file
checkpoint_path.unlink(missing_ok=True)
return None
# ------------------------------------------------------------------
# restore_from_checkpoint — load state and prepare for replay
# ------------------------------------------------------------------
def restore_from_checkpoint(
self, checkpoint_id: Optional[str] = None,
) -> Optional[tuple[AgentStateSnapshot, int]]:
"""
Restore the agent state from a checkpoint.
If checkpoint_id is None, restores from the most recent
checkpoint (highest step number, then most recent timestamp).
Returns (snapshot, wal_start_lsn) where wal_start_lsn is the
WAL position to start replaying from, or None if no checkpoint
is available.
The caller should then:
1. Restore agent state from the snapshot.
2. Call wal.recover(from_lsn=wal_start_lsn) to get in-doubt
transactions.
3. Reverse in-doubt transactions using their pre-images.
4. Resume the agent from snapshot.current_step + 1.
"""
if not self._manifest:
logger.warning("No checkpoints available for restore")
return None
if checkpoint_id is not None:
meta = self._manifest.get(checkpoint_id)
if meta is None:
logger.error(
"Checkpoint %s not found in manifest", checkpoint_id,
)
return None
else:
# Find the most recent checkpoint: highest step, then most recent
sorted_ckpts = sorted(
self._manifest.values(),
key=lambda m: (m["current_step"], m["timestamp"]),
reverse=True,
)
meta = sorted_ckpts[0]
checkpoint_id = meta["checkpoint_id"]
checkpoint_path = Path(meta["file_path"])
if not checkpoint_path.exists():
logger.error(
"Checkpoint file missing: %s", checkpoint_path,
)
return None
try:
with open(checkpoint_path, "rb") as f:
snapshot: AgentStateSnapshot = pickle.load(f)
# Verify checkpoint integrity
if snapshot.checkpoint_id != checkpoint_id:
logger.error(
"Checkpoint ID mismatch: expected %s, got %s",
checkpoint_id, snapshot.checkpoint_id,
)
return None
logger.info(
"Checkpoint restored: %s (step %d/%d, WAL LSN=%d)",
checkpoint_id, snapshot.current_step,
snapshot.total_steps, snapshot.wal_checkpoint_lsn,
)
# The WAL replay should start from the checkpoint's WAL LSN
# (entries at and after this LSN need to be examined)
return snapshot, snapshot.wal_checkpoint_lsn
except Exception as exc:
logger.exception(
"Failed to restore checkpoint %s: %s", checkpoint_id, exc,
)
return None
# ------------------------------------------------------------------
# Auto-checkpoint trigger
# ------------------------------------------------------------------
def maybe_checkpoint(
self,
task_id: str,
current_step: int,
total_steps: int,
decision_path: list[dict[str, Any]],
undo_registry: dict[str, dict[str, Any]],
undo_stack: list[str],
committed_operations: list[str],
*,
context_summary: str = "",
custom_state: dict[str, Any] | None = None,
force: bool = False,
) -> Optional[str]:
"""
Create a checkpoint if the operation threshold has been reached.
Call this after each operation commit. If the number of
operations since the last checkpoint exceeds
checkpoint_interval_operations, a checkpoint is created.
Set force=True to create a checkpoint regardless of the counter.
"""
self._operations_since_checkpoint += 1
if force or self._operations_since_checkpoint >= self.checkpoint_interval:
return self.create_checkpoint(
task_id=task_id,
current_step=current_step,
total_steps=total_steps,
decision_path=decision_path,
undo_registry=undo_registry,
undo_stack=undo_stack,
committed_operations=committed_operations,
context_summary=context_summary,
custom_state=custom_state,
)
return None
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _capture_relevant_env() -> dict[str, str]:
"""Capture environment variables relevant to agent operation."""
relevant_prefixes = (
"AGENT_", "PATH", "HOME", "USER", "LANG",
"PYTHONPATH", "VIRTUAL_ENV", "CONDA_",
)
result: dict[str, str] = {}
for key, value in os.environ.items():
if any(key.startswith(p) for p in relevant_prefixes):
result[key] = value
return result
def _save_manifest(self) -> None:
"""Persist the checkpoint manifest."""
self._manifest_path.write_text(
json.dumps(self._manifest, indent=2, default=str),
)
def _load_manifest(self) -> None:
"""Load checkpoint manifest from disk."""
if not self._manifest_path.exists():
return
try:
self._manifest = json.loads(self._manifest_path.read_text())
logger.info(
"Checkpoint manifest loaded: %d checkpoints",
len(self._manifest),
)
except Exception as exc:
logger.warning("Failed to load checkpoint manifest: %s", exc)
def _prune_old_checkpoints(self) -> None:
"""Remove checkpoints exceeding max_checkpoints."""
if len(self._manifest) <= self.max_checkpoints:
return
# Keep the most recent checkpoints; remove the oldest
sorted_ckpts = sorted(
self._manifest.values(),
key=lambda m: m["timestamp"],
)
to_remove = sorted_ckpts[:len(sorted_ckpts) - self.max_checkpoints]
for meta in to_remove:
ckpt_id = meta["checkpoint_id"]
ckpt_path = Path(meta["file_path"])
ckpt_path.unlink(missing_ok=True)
del self._manifest[ckpt_id]
logger.debug("Pruned old checkpoint: %s", ckpt_id)
self._save_manifest()
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
@property
def checkpoint_count(self) -> int:
return len(self._manifest)
def latest_checkpoint_id(self) -> Optional[str]:
if not self._manifest:
return None
latest = max(
self._manifest.values(),
key=lambda m: m["timestamp"],
)
return latest["checkpoint_id"]
def summary(self) -> dict[str, Any]:
return {
"total_checkpoints": len(self._manifest),
"latest_checkpoint": self.latest_checkpoint_id(),
"max_checkpoints": self.max_checkpoints,
"operations_since_last_checkpoint": self._operations_since_checkpoint,
"checkpoint_interval": self.checkpoint_interval,
"checkpoint_dir_size_bytes": sum(
p.stat().st_size
for p in self.checkpoint_dir.glob("*.pkl")
if p.is_file()
),
}
# ========================================================================
# Usage example: combined WAL + CheckpointManager recovery flow
# ========================================================================
if __name__ == "__main__":
import tempfile as _tempfile
with _tempfile.TemporaryDirectory() as tmpdir:
wal_dir = Path(tmpdir) / "wal"
ckpt_dir = Path(tmpdir) / "checkpoints"
# --- Setup WAL and CheckpointManager ---
wal = WriteAheadLog(wal_dir)
ckpt = CheckpointManager(
checkpoint_dir=ckpt_dir,
wal=wal,
max_checkpoints=5,
checkpoint_interval_operations=3,
)
# --- Simulate agent operations ---
task_id = "deploy-nginx-config"
decision_path: list[dict[str, Any]] = []
undo_registry: dict[str, dict[str, Any]] = {}
undo_stack: list[str] = []
committed: list[str] = []
for step in range(1, 6):
op_id = f"{task_id}-step{step}"
# Phase 1: Record intent
pre_image = {
"table": "configs",
"key": f"/nginx/server{step}",
"old_value": f"server_{step}_v1",
}
intent_lsn = wal.record_intent(
operation_id=op_id,
target=f"/nginx/server{step}",
pre_image=pre_image,
)
assert intent_lsn is not None, "INTENT recording failed"
# Simulate mutation...
# (In reality: execute the actual database/config write here)
# Phase 2: Commit
post_image = {
"table": "configs",
"key": f"/nginx/server{step}",
"new_value": f"server_{step}_v2",
}
wal.commit(op_id, post_image=post_image)
# Track in undo registry
undo_registry[op_id] = {
"strategy": "transaction",
"target": f"/nginx/server{step}",
"pre_image": pre_image,
}
undo_stack.append(op_id)
committed.append(op_id)
decision_path.append({
"step": step,
"decision": f"update_server_{step}",
"chosen_branch": "canary_rollout",
})
# Auto-checkpoint every 3 operations
result = ckpt.maybe_checkpoint(
task_id=task_id,
current_step=step,
total_steps=5,
decision_path=decision_path,
undo_registry=undo_registry,
undo_stack=undo_stack,
committed_operations=committed,
context_summary=f"Updated {step}/5 servers",
)
if result:
print(f"Step {step}: Checkpoint created → {result}")
# --- Simulate crash + recovery ---
print("\n--- Simulating recovery from checkpoint ---")
restored = ckpt.restore_from_checkpoint()
if restored is not None:
snapshot, wal_start_lsn = restored
print(f"Restored from: {snapshot.checkpoint_id}")
print(f" Task: {snapshot.task_id} (step {snapshot.current_step}/{snapshot.total_steps})")
print(f" Decision path depth: {len(snapshot.decision_path)}")
print(f" Committed ops: {len(snapshot.committed_operations)}")
print(f" Undo stack depth: {len(snapshot.undo_stack)}")
print(f" WAL replay start LSN: {wal_start_lsn}")
# Replay WAL from checkpoint LSN to find in-doubt transactions
in_doubt = wal.recover(from_lsn=wal_start_lsn)
print(f" In-doubt transactions: {len(in_doubt)}")
for tx in in_doubt:
print(f" - {tx['operation_id']}: {tx['target']}")
else:
print("No checkpoint available for restore")
# Summary
print(f"\nWAL index entries: {len(wal._index)}")
print(f"Checkpoints: {ckpt.summary()}")
Combining WAL and Checkpoints for Production Recovery
The two components are designed to work together in a specific recovery sequence that mirrors database crash recovery but is adapted for agent workloads. On process restart: (1) CheckpointManager.restore_from_checkpoint() loads the most recent snapshot, restoring the agent's decision context, undo stack, and committed operation set. (2) WriteAheadLog.recover(from_lsn=wal_start_lsn) scans the WAL from the checkpoint's LSN forward, identifying any INTENT entries without matching COMMIT or ABORT markers — these are in-doubt transactions whose mutations may or may not have been applied. (3) For each in-doubt transaction, the recovery system executes the stored pre-image restore: for a database row, this means UPDATE ... SET ... WHERE with the old values; for a KV store, a PUT with the old value; for a file, applying the reverse diff. (4) The agent resumes from snapshot.current_step + 1.
This checkpoint-WAL combination gives agent systems a property that databases have had for decades but agent frameworks have largely lacked: crash-consistent state recovery without data loss. The storage cost is modest — checkpoints are typically a few hundred KB (serialized Python objects), and WAL entries are a few hundred bytes each. For a typical agent session of 1,000 operations with checkpoints every 10 operations, the total storage overhead is under 10MB — a fraction of the cost of full filesystem snapshots. For runtime environments where broader isolation is needed, container-level checkpointing (e.g., Docker checkpoint, CRIU) can complement this approach; see Agent Runtime Isolation for integration patterns.
5. Environment-Level Rollback — Container Snapshots and Immutable Infrastructure
File-level and data-level rollback undo specific writes. But many agent-induced failures transcend individual files — the agent corrupts a container's entire filesystem, installs conflicting packages across a cluster, or Terraforms infrastructure into an unrecoverable state. These failures demand environment-level rollback: the ability to revert an entire execution environment — filesystem, network configuration, running processes, installed packages — to a known-good state regardless of how many individual writes the agent performed. This section covers container snapshot primitives, immutable infrastructure patterns, Infrastructure-as-Code rollback strategies, and clean-slate recovery for "scorched earth" scenarios.
Why File-Level Rollback Is Insufficient at Environment Scale
Consider an agent tasked with "install Python 3.12, upgrade all pip packages, and reconfigure the systemd unit for the application." The agent executes 47 writes across 12 files: /usr/bin/python3 symlinks, /etc/systemd/system/app.service, /usr/local/lib/python3.12/site-packages/, /etc/ld.so.conf.d/, and half a dozen .conf files in /etc/. If the systemd unit has a typo and the application fails to start, rolling back file by file is not just tedious — it's dangerous. The agent may have created files it didn't explicitly declare (pip installs write to unpredictable paths), modified shared libraries depended on by other services, or changed permissions on directories. File-level rollback only knows about files it tracked. Environment-level rollback captures the entire state — every inode, every symlink, every extended attribute — and restores it atomically.
The two primary primitives for environment-level rollback are container checkpoints (Docker's overlay snapshot mechanism, CRIU for live process migration) and immutable infrastructure (deploy a new clean environment and redirect traffic, rather than mutating an existing one). Both approaches share a philosophical foundation: treat the environment as an indivisible snapshot that can be captured, validated, and reverted, rather than a collection of mutable files that must be individually tracked. Runtime isolation is a prerequisite — the agent must operate in a sandbox where its write footprint is bounded; see Agent Runtime Isolation for sandboxing strategies.
Docker Overlay Snapshots and CRIU Checkpoints
Docker's overlay2 storage driver provides a natural snapshot primitive: each container's writable layer is a thin overlay on top of the image layers. Before an agent begins a risky operation, the system can docker commit the container to create a named snapshot, then docker run a recovery container from that snapshot if rollback is needed. Under the hood, docker commit creates a new image layer that captures all filesystem changes since the container started — this is a differential snapshot, not a full copy. For a container with a 200MB base image and 15MB of agent modifications, the snapshot costs 15MB of storage, not 215MB.
For live-process rollback — where the agent is in the middle of a long-running operation and you need to revert without destroying the container — CRIU (Checkpoint/Restore In Userspace) provides process-level checkpointing. criu dump serializes the process tree's memory, file descriptors, network sockets, and CPU registers to a set of image files; criu restore reconstructs the exact process state from those images. Docker's docker checkpoint create wraps CRIU for container-level use. The critical limitation: CRIU checkpoints capture in-memory process state, not filesystem state. For agent rollback, CRIU must be combined with a filesystem snapshot (Docker commit) for a complete environment checkpoint. The code below implements a ContainerEnvironment class that orchestrates both mechanisms.
Blue/Green, Canary, and Immutable Infrastructure Patterns
Immutable infrastructure inverts the rollback problem entirely: instead of trying to undo changes on a live environment, deploy a new clean environment alongside the existing one, validate it, and swap traffic. If validation fails, swap back — there is nothing to "undo" because the original environment was never modified. This is the blue/green deployment pattern applied to agent automation: the "green" environment is the agent's target where it performs all its writes; the "blue" environment is the untouched original. A load balancer or DNS switch routes traffic between them. The agent writes to green with zero risk to production until the switch is flipped.
Canary deployment extends this further: the agent first writes to a single canary instance (e.g., 1 of 10 servers), validates its output, and only then proceeds to the remaining instances. If the canary fails validation, the blast radius is 1 server, not 10. For agent operations, the canary pattern translates to "test the agent's write output on one isolated target before applying it broadly." For state machine design that encodes these rollout gates into the agent's decision flow, see Agent State Machine Design. For error recovery patterns that complement environment rollback when validation fails early, see Agent Error Recovery.
Infrastructure-as-Code Rollback: Terraform and Pulumi Patterns
When agents operate on cloud infrastructure via IaC tools (Terraform, Pulumi, CloudFormation), the rollback strategy shifts from "restore files" to "revert infrastructure state." Terraform's terraform plan generates an execution plan showing exactly what resources will be created, modified, or destroyed. Before the agent applies that plan, the current state file (terraform.tfstate) is checkpointed — it is a JSON snapshot of every resource and its current configuration. Rollback means restoring the previous state file and running terraform apply to reconcile actual infrastructure back to that state. For Pulumi, the equivalent is pulumi stack export / pulumi stack import.
Three IaC rollback patterns are particularly relevant to agent automation: (1) state-file checkpoint — save terraform.tfstate before every agent-initiated apply, restore it on failure, and re-apply; (2) plan approval gate — the agent generates a terraform plan, but a human or automated policy engine must approve it before apply proceeds; (3) resource-level undo — for a failed apply where some resources were created before the failure, terraform destroy -target=<resource> removes only the partially-created resources, cleaning up before the next attempt. State-file checkpointing is the cheapest and most universal; plan approval gates add a human-in-the-loop safety layer; resource-level undo is the surgical instrument for partial failures.
Clean-Slate Recovery: When Everything Must Be Reset
In the worst case — the agent corrupts the container beyond recognition, or Terraform state drifts irreconcilably — the only option is clean-slate recovery: destroy the environment entirely and rebuild from the original image or IaC definition. This is the "scorched earth" rollback pattern. Clean-slate recovery is fast (a fresh docker run or terraform destroy && terraform apply) but destructive (any legitimate data written by the agent before the failure is lost). For stateless agent operations — config generation, code linting, static analysis — clean-slate is often the correct rollback strategy because there is nothing valuable to preserve. For stateful operations, clean-slate is the emergency last resort when all other rollback strategies have failed.
ContainerEnvironment: Implementation
The code below implements a ContainerEnvironment class that provides snapshot-based rollback for Docker containers, with support for Docker commit snapshots, clean-slate recovery via image reset, and integration hooks for Terraform state checkpointing:
from __future__ import annotations
import hashlib
import json
import os
import shutil
import subprocess
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
import logging
logger = logging.getLogger("agent.rollback.environment")
# ========================================================================
# EnvironmentSnapshot — a point-in-time capture of a container environment
# ========================================================================
@dataclass
class EnvironmentSnapshot:
"""
Represents a captured environment state that can be restored.
Supports two snapshot types:
- DOCKER_COMMIT: Docker image created via `docker commit`
- STATE_FILE: IaC state file checkpoint (Terraform, Pulumi, etc.)
"""
snapshot_id: str
snapshot_type: str # "docker_commit" | "state_file" | "criu"
container_id: str # Docker container ID (or "" for state_file)
image_tag: str # Docker image:tag for docker_commit snapshots
state_file_path: Optional[Path] = None # IaC state file backup path
metadata: dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
size_bytes: int = 0
# ========================================================================
# ContainerEnvironment — environment-level snapshot/rollback/destroy
# ========================================================================
class ContainerEnvironment:
"""
Manages environment-level rollback for agent execution environments.
Capabilities:
- Docker commit snapshots: capture container filesystem state
- IaC state file checkpoints: save/restore Terraform state
- Clean-slate recovery: reset container to original image
- Snapshot lifecycle: create, list, restore, prune
Architecture:
Every agent task runs inside a container (Docker or similar).
Before the agent performs any write, the environment is snapshotted.
If the task fails, the snapshot is restored — reverting the entire
filesystem to the pre-write state.
If snapshot restore fails, clean-slate recovery resets to the
original image.
Usage:
env = ContainerEnvironment(
container_id="agent-task-123",
original_image="agent-sandbox:latest",
snapshot_dir="/var/agent/env-snapshots",
iac_state_dir="/var/agent/iac-states",
)
# Before agent writes:
snap = env.snapshot(operation_id="nginx-upgrade-step1")
# Agent performs writes...
# On failure:
env.rollback(snap.snapshot_id)
# Emergency: reset everything to original image
env.destroy_and_reset()
"""
def __init__(
self,
container_id: str,
original_image: str,
snapshot_dir: str | Path,
iac_state_dir: str | Path | None = None,
*,
docker_binary: str = "docker",
terraform_binary: str = "terraform",
max_snapshots: int = 10,
) -> None:
self.container_id = container_id
self.original_image = original_image
self.snapshot_dir = Path(snapshot_dir)
self.snapshot_dir.mkdir(parents=True, exist_ok=True)
self.iac_state_dir = Path(iac_state_dir) if iac_state_dir else None
if self.iac_state_dir:
self.iac_state_dir.mkdir(parents=True, exist_ok=True)
self.docker_bin = docker_binary
self.terraform_bin = terraform_binary
self.max_snapshots = max_snapshots
# Snapshot registry
self._snapshots: dict[str, EnvironmentSnapshot] = {}
self._registry_path = self.snapshot_dir / "registry.json"
self._load_registry()
# ------------------------------------------------------------------
# snapshot() — capture the current environment state
# ------------------------------------------------------------------
def snapshot(
self,
operation_id: str,
*,
include_iac_state: bool = False,
iac_state_path: str | Path | None = None,
) -> Optional[EnvironmentSnapshot]:
"""
Capture the current environment state as a rollback checkpoint.
Creates:
1. A Docker commit of the container (filesystem snapshot).
2. Optionally, a backup of the IaC state file.
Returns the EnvironmentSnapshot on success, None on failure.
The caller MUST NOT proceed with the agent write if this
returns None — without a snapshot, rollback is impossible.
"""
snapshot_id = f"env-snap-{operation_id}-{int(time.time())}"
timestamp = time.time()
# --- Docker commit (filesystem snapshot) -------------------------
image_tag = f"agent-rollback:{snapshot_id}"
commit_cmd = [
self.docker_bin, "commit",
self.container_id,
image_tag,
]
try:
result = subprocess.run(
commit_cmd,
capture_output=True,
text=True,
timeout=120, # 2-minute timeout for large containers
)
if result.returncode != 0:
logger.error(
"Docker commit failed for %s: %s",
self.container_id, result.stderr.strip(),
)
return None
# Get image size
inspect_cmd = [
self.docker_bin, "image", "inspect",
image_tag,
"--format", "{{.Size}}",
]
inspect_result = subprocess.run(
inspect_cmd, capture_output=True, text=True, timeout=10,
)
size_bytes = int(inspect_result.stdout.strip()) if inspect_result.returncode == 0 else 0
logger.info(
"Docker commit snapshot: %s → %s (%d bytes)",
self.container_id, image_tag, size_bytes,
)
except subprocess.TimeoutExpired:
logger.error(
"Docker commit timed out for %s (container may be too large)",
self.container_id,
)
return None
except FileNotFoundError:
logger.error(
"Docker binary '%s' not found", self.docker_bin,
)
return None
except Exception as exc:
logger.exception(
"Docker commit failed with unexpected error: %s", exc,
)
return None
# --- IaC state file backup (optional) ----------------------------
state_file_path: Optional[Path] = None
if include_iac_state and iac_state_path:
src = Path(iac_state_path)
if src.exists():
dst = self.iac_state_dir / f"{snapshot_id}-{src.name}"
try:
shutil.copy2(src, dst)
state_file_path = dst
logger.debug("IaC state backed up: %s → %s", src, dst)
except Exception as exc:
logger.error("IaC state backup failed: %s", exc)
# Non-fatal: Docker snapshot still succeeded
else:
logger.warning(
"IaC state file not found: %s — skipping backup", src,
)
# --- Register snapshot -------------------------------------------
snap = EnvironmentSnapshot(
snapshot_id=snapshot_id,
snapshot_type="docker_commit",
container_id=self.container_id,
image_tag=image_tag,
state_file_path=state_file_path,
metadata={
"operation_id": operation_id,
"original_image": self.original_image,
},
timestamp=timestamp,
size_bytes=size_bytes,
)
self._snapshots[snapshot_id] = snap
self._save_registry()
self._prune_snapshots()
logger.info(
"Environment snapshot created: %s (type=docker_commit, %d bytes)",
snapshot_id, size_bytes,
)
return snap
# ------------------------------------------------------------------
# rollback() — restore environment to a previous snapshot
# ------------------------------------------------------------------
def rollback(self, snapshot_id: str) -> bool:
"""
Restore the container environment to a previously captured snapshot.
Restoration strategy:
1. Stop the container (if running).
2. Remove the container.
3. Create a new container from the snapshot image.
4. Optionally restore IaC state file and re-apply.
Returns True if restoration succeeded, False otherwise.
"""
snap = self._snapshots.get(snapshot_id)
if snap is None:
logger.error(
"rollback() called for unknown snapshot: %s", snapshot_id,
)
return False
logger.info(
"Rolling back environment to snapshot: %s (image=%s)",
snapshot_id, snap.image_tag,
)
try:
# Step 1: Stop the container (graceful, then force)
self._docker_stop_container(self.container_id)
# Step 2: Remove the container (not the image)
rm_cmd = [self.docker_bin, "rm", "-f", self.container_id]
subprocess.run(rm_cmd, capture_output=True, timeout=30)
# Step 3: Recreate from snapshot image
# Preserve the original run configuration (ports, volumes, env)
# by inspecting the snapshot image and reconstructing.
run_cmd = [
self.docker_bin, "run", "-d",
"--name", self.container_id,
snap.image_tag,
]
run_result = subprocess.run(
run_cmd, capture_output=True, text=True, timeout=60,
)
if run_result.returncode != 0:
logger.error(
"Container restore failed: %s", run_result.stderr.strip(),
)
# Fallback: try clean-slate recovery
return self.destroy_and_reset()
new_container_id = run_result.stdout.strip()[:12]
logger.info(
"Container restored from snapshot: %s → %s",
snapshot_id, new_container_id,
)
# Step 4: Restore IaC state if applicable
if snap.state_file_path and snap.state_file_path.exists():
self._restore_iac_state(snap)
return True
except subprocess.TimeoutExpired as exc:
logger.error("Container rollback timed out: %s", exc)
return self.destroy_and_reset()
except Exception as exc:
logger.exception("Container rollback failed: %s", exc)
return self.destroy_and_reset()
# ------------------------------------------------------------------
# destroy_and_reset() — clean-slate recovery to original image
# ------------------------------------------------------------------
def destroy_and_reset(self) -> bool:
"""
Emergency clean-slate recovery: destroy the container entirely
and recreate from the original image.
This is the last-resort rollback. It discards ALL state since
the container was originally created — not just the last write,
but everything. Use only when snapshot-based rollback has failed
or when the environment is irreparably corrupted.
Returns True if the reset succeeded, False if even this failed.
"""
logger.warning(
"CLEAN-SLATE RECOVERY: destroying container %s and resetting to %s",
self.container_id, self.original_image,
)
try:
# Force-stop and remove the corrupted container
self._docker_stop_container(self.container_id)
rm_cmd = [self.docker_bin, "rm", "-f", self.container_id]
subprocess.run(rm_cmd, capture_output=True, timeout=30)
# Clean up snapshot images (they reference corrupted state)
for snap in list(self._snapshots.values()):
if snap.image_tag != self.original_image:
try:
subprocess.run(
[self.docker_bin, "rmi", "-f", snap.image_tag],
capture_output=True, timeout=30,
)
except Exception:
pass
self._snapshots.clear()
self._save_registry()
# Recreate from the original pristine image
run_cmd = [
self.docker_bin, "run", "-d",
"--name", self.container_id,
self.original_image,
]
run_result = subprocess.run(
run_cmd, capture_output=True, text=True, timeout=60,
)
if run_result.returncode != 0:
logger.error(
"Clean-slate recovery FAILED: cannot create container "
"from original image %s — %s",
self.original_image, run_result.stderr.strip(),
)
return False
logger.info(
"Clean-slate recovery succeeded: container %s reset to %s",
self.container_id, self.original_image,
)
return True
except Exception as exc:
logger.exception(
"Clean-slate recovery failed catastrophically: %s", exc,
)
return False
# ------------------------------------------------------------------
# Terraform/Pulumi state checkpoint helpers
# ------------------------------------------------------------------
def checkpoint_iac_state(
self,
operation_id: str,
state_path: str | Path,
working_dir: str | Path,
) -> Optional[Path]:
"""
Create a checkpoint of the IaC state file before an agent applies
infrastructure changes.
For Terraform: saves terraform.tfstate (or terraform.tfstate.d/*).
For Pulumi: exports the stack state via `pulumi stack export`.
Returns the path to the checkpointed state file, or None on failure.
"""
state_src = Path(state_path)
working = Path(working_dir)
if not state_src.exists():
logger.warning(
"IaC state file not found at %s — nothing to checkpoint",
state_src,
)
return None
if not self.iac_state_dir:
logger.warning("No IaC state directory configured")
return None
snapshot_id = f"iac-{operation_id}-{int(time.time())}"
dst = self.iac_state_dir / f"{snapshot_id}-{state_src.name}"
try:
# Also capture terraform.tfstate.backup if it exists
shutil.copy2(state_src, dst)
backup_src = Path(str(state_src) + ".backup")
if backup_src.exists():
backup_dst = self.iac_state_dir / f"{snapshot_id}-{backup_src.name}"
shutil.copy2(backup_src, backup_dst)
logger.info(
"IaC state checkpoint: %s → %s", state_src, dst,
)
# Record in registry
snap = EnvironmentSnapshot(
snapshot_id=snapshot_id,
snapshot_type="state_file",
container_id="",
image_tag="",
state_file_path=dst,
metadata={
"operation_id": operation_id,
"original_path": str(state_src),
"working_dir": str(working),
},
size_bytes=dst.stat().st_size,
)
self._snapshots[snapshot_id] = snap
self._save_registry()
return dst
except Exception as exc:
logger.error("IaC state checkpoint failed: %s", exc)
return None
def rollback_iac_state(
self,
snapshot_id: str,
auto_apply: bool = False,
) -> bool:
"""
Restore an IaC state file from a checkpoint and optionally re-apply
infrastructure to reconcile.
This is the IaC equivalent of environment rollback: restore the
state file to the pre-write version, then optionally run
`terraform apply` to reconcile actual infrastructure.
Args:
snapshot_id: The checkpoint to restore.
auto_apply: If True, automatically run `terraform apply` after
restoring state. If False, only restores the state
file; the caller decides whether to re-apply.
Returns True on success.
"""
snap = self._snapshots.get(snapshot_id)
if not snap or not snap.state_file_path:
logger.error("IaC state checkpoint not found: %s", snapshot_id)
return False
if not snap.state_file_path.exists():
logger.error(
"IaC state file missing: %s (orphaned checkpoint?)",
snap.state_file_path,
)
return False
try:
# Restore state file to original location
original_path = Path(snap.metadata.get("original_path", ""))
working_dir = Path(snap.metadata.get("working_dir", "."))
if original_path:
shutil.copy2(snap.state_file_path, original_path)
logger.info(
"IaC state restored: %s → %s",
snap.state_file_path, original_path,
)
# Optionally re-apply
if auto_apply and working_dir.exists():
apply_cmd = [
self.terraform_bin, "apply",
"-auto-approve",
"-input=false",
]
result = subprocess.run(
apply_cmd,
cwd=str(working_dir),
capture_output=True,
text=True,
timeout=600, # 10-minute timeout for infra changes
)
if result.returncode != 0:
logger.error(
"Terraform apply after state restore failed: %s",
result.stderr[:500],
)
return False
logger.info("IaC state restored and re-applied")
return True
except subprocess.TimeoutExpired:
logger.error("IaC apply timed out after state restore")
return False
except Exception as exc:
logger.exception("IaC state rollback failed: %s", exc)
return False
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _docker_stop_container(self, container_id: str) -> None:
"""Stop a Docker container gracefully, then forcefully."""
# Graceful stop first (SIGTERM, 10s timeout)
try:
subprocess.run(
[self.docker_bin, "stop", "-t", "10", container_id],
capture_output=True, timeout=20,
)
except subprocess.TimeoutExpired:
logger.warning(
"Graceful stop timed out for %s, forcing kill", container_id,
)
# Force kill (SIGKILL)
try:
subprocess.run(
[self.docker_bin, "kill", container_id],
capture_output=True, timeout=10,
)
except Exception:
pass
except Exception:
pass
def _restore_iac_state(self, snap: EnvironmentSnapshot) -> None:
"""Restore IaC state file and re-apply infrastructure."""
if not snap.state_file_path or not snap.state_file_path.exists():
return
try:
self.rollback_iac_state(snap.snapshot_id, auto_apply=False)
except Exception as exc:
logger.error("IaC restore during rollback failed: %s", exc)
# ------------------------------------------------------------------
# Registry persistence
# ------------------------------------------------------------------
def _save_registry(self) -> None:
"""Persist snapshot registry to disk."""
data = {
sid: {
"snapshot_id": s.snapshot_id,
"snapshot_type": s.snapshot_type,
"container_id": s.container_id,
"image_tag": s.image_tag,
"state_file_path": str(s.state_file_path) if s.state_file_path else None,
"metadata": s.metadata,
"timestamp": s.timestamp,
"size_bytes": s.size_bytes,
}
for sid, s in self._snapshots.items()
}
self._registry_path.write_text(json.dumps(data, indent=2))
def _load_registry(self) -> None:
"""Load snapshot registry from disk."""
if not self._registry_path.exists():
return
try:
data = json.loads(self._registry_path.read_text())
for sid, sdata in data.items():
self._snapshots[sid] = EnvironmentSnapshot(
snapshot_id=sdata["snapshot_id"],
snapshot_type=sdata["snapshot_type"],
container_id=sdata.get("container_id", ""),
image_tag=sdata.get("image_tag", ""),
state_file_path=(
Path(sdata["state_file_path"])
if sdata.get("state_file_path") else None
),
metadata=sdata.get("metadata", {}),
timestamp=sdata.get("timestamp", 0.0),
size_bytes=sdata.get("size_bytes", 0),
)
logger.info(
"Loaded %d environment snapshots from registry",
len(self._snapshots),
)
except Exception as exc:
logger.warning("Failed to load snapshot registry: %s", exc)
def _prune_snapshots(self) -> None:
"""Remove oldest snapshots exceeding max_snapshots."""
if len(self._snapshots) <= self.max_snapshots:
return
sorted_snaps = sorted(
self._snapshots.items(),
key=lambda item: item[1].timestamp,
)
to_remove = sorted_snaps[:len(sorted_snaps) - self.max_snapshots]
for sid, snap in to_remove:
# Remove Docker image for docker_commit snapshots
if snap.snapshot_type == "docker_commit" and snap.image_tag:
try:
subprocess.run(
[self.docker_bin, "rmi", "-f", snap.image_tag],
capture_output=True, timeout=30,
)
except Exception:
pass
# Remove state file backup
if snap.state_file_path:
snap.state_file_path.unlink(missing_ok=True)
del self._snapshots[sid]
logger.debug("Pruned old snapshot: %s", sid)
self._save_registry()
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
@property
def snapshot_count(self) -> int:
return len(self._snapshots)
def list_snapshots(self) -> list[dict[str, Any]]:
"""Return a human-readable list of all snapshots."""
return [
{
"snapshot_id": s.snapshot_id,
"type": s.snapshot_type,
"image_tag": s.image_tag,
"timestamp": datetime.fromtimestamp(s.timestamp).isoformat(),
"size_bytes": s.size_bytes,
"operation_id": s.metadata.get("operation_id", ""),
}
for s in sorted(
self._snapshots.values(),
key=lambda s: s.timestamp,
reverse=True,
)
]
def summary(self) -> dict[str, Any]:
"""Return a summary of the environment rollback state."""
total_size = sum(s.size_bytes for s in self._snapshots.values())
return {
"container_id": self.container_id,
"original_image": self.original_image,
"snapshot_count": len(self._snapshots),
"total_snapshot_size_bytes": total_size,
"max_snapshots": self.max_snapshots,
"snapshots": self.list_snapshots(),
}
# ========================================================================
# Usage example
# ========================================================================
if __name__ == "__main__":
import tempfile as _tempfile
with _tempfile.TemporaryDirectory() as tmpdir:
snap_dir = Path(tmpdir) / "env-snaps"
iac_dir = Path(tmpdir) / "iac-states"
# Create a mock IaC state file
iac_dir.mkdir(parents=True)
state_file = iac_dir / "terraform.tfstate"
state_file.write_text(json.dumps({
"version": 4,
"resources": [{"type": "aws_instance", "name": "web"}],
}))
# Initialize environment manager
# Note: In a real scenario, container_id would be a running Docker
# container. For this example, we demonstrate the IaC checkpoint flow.
env = ContainerEnvironment(
container_id="agent-task-demo",
original_image="agent-sandbox:latest",
snapshot_dir=snap_dir,
iac_state_dir=iac_dir / "checkpoints",
)
# --- IaC state checkpoint before agent modifies infrastructure ---
checkpoint = env.checkpoint_iac_state(
operation_id="add-load-balancer",
state_path=state_file,
working_dir=iac_dir,
)
if checkpoint:
print(f"IaC checkpoint created: {checkpoint}")
# Simulate: agent modifies terraform.tfstate (bad write)
state_file.write_text(json.dumps({
"version": 4,
"resources": [
{"type": "aws_instance", "name": "web"},
{"type": "aws_lb", "name": "bad_lb_config"},
],
}))
# --- Rollback IaC state ---
if checkpoint:
success = env.rollback_iac_state(
checkpoint.parent.name.replace(
f"iac-", ""
).rsplit("-", 1)[0] if False else "",
auto_apply=False,
)
# For demo: directly restore the state file
for sid, snap in env._snapshots.items():
if snap.state_file_path and snap.state_file_path.exists():
shutil.copy2(snap.state_file_path, state_file)
print("IaC state restored from checkpoint")
break
print(f"Final state: {state_file.read_text()[:100]}...")
print(f"Snapshot count: {env.snapshot_count}")
The ContainerEnvironment class sits at the outermost layer of the rollback architecture. While CopyOnWriteAgent handles individual files and RollbackOrchestrator manages strategy selection, ContainerEnvironment provides the safety net for catastrophic failures — the "undo" of last resort. In production, the three layers work together: the orchestrator determines strategy and registers undo handlers for each write; the copy-on-write agent executes file-level writes with diff-based rollback; and the container environment snapshots the entire execution context at task boundaries so that even if the inner layers fail, the system can always return to a known-good state. For how these layers integrate with the agent's state machine to gate risky operations, see Agent State Machine Design.
6. Stack-Based Rollback — Composing and Sequencing Undo Operations
The previous sections built rollback primitives for individual files, data records, and entire environments. But agents do not perform isolated writes — they execute sequences of interdependent operations. An agent tasked with "update the database connection string across all configuration files" writes to 7 files, 2 database configuration tables, and 1 environment variable. Rolling back any single write in isolation is meaningless if the other 9 writes remain — the system would be in an inconsistent state where some configs point to the old database and others to the new one. The solution is a stack-based rollback architecture: every write operation registers its undo handler on a LIFO stack; on failure, handlers are popped and executed in reverse order, ensuring that the most recent (and therefore most dependent) writes are undone first.
Why LIFO Ordering Is Mandatory
Consider three sequential agent writes: (1) write /etc/hosts with a new entry for db.internal; (2) write /etc/app.conf to reference db.internal; (3) write a database record that app.conf will consume. If a failure occurs after step 3, the undo must execute in reverse order: step 3 → step 2 → step 1. Forward-order undo (step 1 → step 2 → step 3) would first restore /etc/hosts to its pre-write state (removing db.internal), leaving /etc/app.conf referencing a hostname that no longer resolves — creating a dangling reference that is arguably worse than the original error. LIFO ordering guarantees that dependents are undone before their dependencies, maintaining referential integrity throughout the rollback process.
This dependency-awareness extends beyond intra-operation ordering. Some writes are logically independent — writing to /etc/nginx/nginx.conf and writing to /etc/ssh/sshd_config have no causal relationship, and their undo order doesn't matter. Other writes form dependency chains: a DNS write enables a config write that enables a database write. The rollback stack must recognize these dependency structures — if the database write fails but the DNS and config writes succeeded, rollback must undo the config write (which referenced the database record) and the database write itself, but not necessarily the DNS write (which may be independently valid). The RollbackStack class below implements this partial, dependency-aware rollback.
Composable Undo Handlers and Dependency Graphs
Each undo handler on the rollback stack is a composable unit: it has a unique operation ID, a typed undo function (snapshot restore, WAL pre-image, compensation call), a list of operations it depends on, a list of operations that depend on it, and a status field (pending, committed, rolled back, failed). This metadata enables the stack to answer queries like "if I undo operation X, which other operations must also be undone?" — a transitive dependency closure. The stack maintains an in-memory dependency graph using simple adjacency lists, recomputed on each push() or pop() call.
Dependency registration follows a "declare after creation" pattern: when the agent executes write B that depends on the output of write A, the agent calls stack.add_dependency(B_id, depends_on=A_id). This is intentionally explicit — the system does not attempt to infer dependencies from file content analysis (which would be fragile and error-prone). The agent's task logic — encoded in its state machine — knows which writes depend on which other writes. For state machine patterns that formalize this dependency tracking, see Agent State Machine Design. For error recovery paths that interact with the rollback stack when partial failures occur, see Agent Error Recovery.
Partial Rollback and Error Handling
A full rollback_all() is the simplest and safest option — undo everything uncommitted. But in long-running agent tasks (100+ operations over multiple hours), a full rollback is wasteful and potentially dangerous. If the agent has committed 90 writes successfully and the 91st write fails, rolling back all 91 writes discards the 90 successful commits. Partial rollback — "undo writes 88 through 91 because write 91 depends on 90, 89, and 88" — preserves the valid work while surgically removing the failed dependency chain.
But partial rollback introduces a hard problem: determining the correct undo boundary. The boundary is the transitive closure of dependencies from the failed operation backward through the dependency graph. For write 91 that failed, the system walks the dependency graph backward from 91: if 91 depends on 90, 90 depends on 88, and 89 is independent, the undo boundary is {91, 90, 88} — 89 is left intact. The rollback_to() method implements this: it accepts a failed operation ID and computes the minimal set of operations that must be undone, executing their undo handlers in correct LIFO order. This is the most complex operation in the rollback stack, and its correctness depends entirely on accurate dependency declarations.
Error handling within the rollback stack itself deserves special attention: what happens when an undo handler fails? If the system tries to undo a database write but the database is unreachable, the undo handler returns False. The stack records this failure but continues executing remaining undo handlers — a failed undo for one operation should not block rollback of independent operations. This is a deliberate design choice: partial recovery is better than no recovery. The stack's final output includes both the list of successfully undone operations and the list of failed undos, enabling the recovery orchestrator to escalate failed undos to human operators or retry logic.
RollbackStack: Implementation
The code below implements a RollbackStack class with push/pop/rollback_to/dependency-aware partial rollback, composable undo handlers, and comprehensive error handling:
from __future__ import annotations
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Callable, Optional, Any
import logging
logger = logging.getLogger("agent.rollback.stack")
# ========================================================================
# StackEntry — a single undo handler on the rollback stack
# ========================================================================
class UndoStatus:
"""Lifecycle status of an undo handler."""
PENDING = "pending" # Registered, write not yet committed
COMMITTED = "committed" # Write succeeded, undo not needed
ROLLED_BACK = "rolled_back" # Undo executed successfully
FAILED = "failed" # Undo execution failed
@dataclass
class StackEntry:
"""
One entry on the rollback stack — represents a single write operation
and its registered undo handler.
Dependencies:
- depends_on: set of operation_ids that THIS entry depends on.
These must be undone BEFORE this entry can be undone.
- depended_by: set of operation_ids that depend on THIS entry.
If this entry is rolled back, dependents should also be rolled back.
The dependency graph enables partial rollback: when operation X fails,
the stack computes the transitive closure of "what depends on X" to
determine the minimal undo boundary.
"""
operation_id: str
undo_handler: Callable[[], bool]
description: str
status: str = UndoStatus.PENDING
depends_on: set[str] = field(default_factory=set)
depended_by: set[str] = field(default_factory=set)
metadata: dict[str, Any] = field(default_factory=dict)
registered_at: float = field(default_factory=time.time)
# ========================================================================
# RollbackStack — LIFO stack with dependency-aware partial rollback
# ========================================================================
class RollbackStack:
"""
A LIFO undo stack with dependency-aware partial rollback.
Key capabilities:
- push(): Register an undo handler for a write operation.
- pop(): Remove and return the top entry (after commit).
- commit(): Mark an entry as committed (excluded from rollback).
- rollback_all(): Execute all pending undo handlers in LIFO order.
- rollback_to(): Partial rollback — undo the failed operation and all
operations that depend on it, preserving independent
committed work.
Dependency model:
- Dependencies are declared EXPLICITLY by the agent via add_dependency().
- The system does NOT infer dependencies from file content.
- LIFO ordering is maintained: dependencies are always pushed BEFORE
the operations that depend on them (topological order guarantee).
Error handling during rollback:
- If an undo handler fails, the failure is recorded but rollback
CONTINUES for remaining handlers.
- Failed undos are reported in the return value so the caller can
escalate (human intervention, retry, alert).
Usage:
stack = RollbackStack()
# Before each write:
stack.push("op-1", undo_fn1, "Restore /etc/hosts")
stack.push("op-2", undo_fn2, "Restore /etc/app.conf")
stack.add_dependency("op-2", depends_on="op-1")
# If the task succeeds:
stack.commit("op-1")
stack.commit("op-2")
# If op-2 fails:
failed = stack.rollback_to("op-2") # Undoes op-2 and op-1
"""
def __init__(self) -> None:
# The LIFO stack — most recent push at the end
self._stack: list[StackEntry] = []
# Fast lookup: operation_id → StackEntry
self._index: dict[str, StackEntry] = {}
# Dependency adjacency lists (reverse edges for backward traversal)
# depends_on[op_id] = {op_ids this entry depends on}
# depended_by[op_id] = {op_ids that depend on this entry}
self._depends_on: dict[str, set[str]] = defaultdict(set)
self._depended_by: dict[str, set[str]] = defaultdict(set)
# Rollback execution log
self._rollback_log: list[dict[str, Any]] = []
# ------------------------------------------------------------------
# push() — register an undo handler on the stack
# ------------------------------------------------------------------
def push(
self,
operation_id: str,
undo_handler: Callable[[], bool],
description: str,
*,
metadata: dict[str, Any] | None = None,
) -> StackEntry:
"""
Register an undo handler for a write operation.
The handler is pushed onto the LIFO stack. It will be executed
when rollback_all() or rollback_to() is called, UNLESS commit()
is called first to mark the write as successful.
Args:
operation_id: Unique ID for this operation.
undo_handler: Callable that restores the pre-write state.
Must return True on success, False on failure.
description: Human-readable description of the undo action.
metadata: Optional extra data (strategy, target, file path).
Returns:
The created StackEntry.
Raises:
ValueError: If operation_id already exists on the stack.
"""
if operation_id in self._index:
raise ValueError(
f"Operation ID {operation_id!r} already exists on the stack. "
f"Use a unique ID for each write operation."
)
entry = StackEntry(
operation_id=operation_id,
undo_handler=undo_handler,
description=description,
metadata=metadata or {},
)
self._stack.append(entry)
self._index[operation_id] = entry
logger.debug(
"RollbackStack push: op=%s desc=%r (stack depth=%d)",
operation_id, description, len(self._stack),
)
return entry
# ------------------------------------------------------------------
# pop() — remove the top entry from the stack
# ------------------------------------------------------------------
def pop(self) -> Optional[StackEntry]:
"""
Remove and return the top entry from the stack.
This is typically used after commit() to clean up. Entries
already committed are removed; entries that haven't been
committed should use rollback() instead.
Returns None if the stack is empty.
"""
if not self._stack:
return None
entry = self._stack.pop()
self._index.pop(entry.operation_id, None)
self._depends_on.pop(entry.operation_id, None)
self._depended_by.pop(entry.operation_id, None)
# Clean up references from other entries
for deps in self._depends_on.values():
deps.discard(entry.operation_id)
for deps in self._depended_by.values():
deps.discard(entry.operation_id)
logger.debug(
"RollbackStack pop: op=%s (stack depth=%d)",
entry.operation_id, len(self._stack),
)
return entry
# ------------------------------------------------------------------
# commit() — mark a write as successful, exclude from rollback
# ------------------------------------------------------------------
def commit(self, operation_id: str) -> bool:
"""
Mark a write operation as committed (successful).
Committed entries are excluded from rollback_all() and
rollback_to(). Their undo handlers are preserved in the index
for audit purposes but will not be executed.
Returns True if the operation was found and committed.
"""
entry = self._index.get(operation_id)
if entry is None:
logger.warning(
"commit() called for unknown operation: %s", operation_id,
)
return False
if entry.status == UndoStatus.ROLLED_BACK:
logger.warning(
"commit() called for already-rolled-back operation: %s",
operation_id,
)
return False
entry.status = UndoStatus.COMMITTED
logger.info("RollbackStack commit: op=%s", operation_id)
return True
# ------------------------------------------------------------------
# add_dependency() — declare a dependency between operations
# ------------------------------------------------------------------
def add_dependency(
self,
operation_id: str,
depends_on: str,
) -> bool:
"""
Declare that `operation_id` depends on `depends_on`.
This means: if `depends_on` is rolled back, `operation_id`
should also be rolled back (transitive dependency).
Both operation_ids must already exist on the stack.
Returns True if the dependency was registered.
"""
if operation_id not in self._index:
logger.error(
"add_dependency: %s not found on stack", operation_id,
)
return False
if depends_on not in self._index:
logger.error(
"add_dependency: %s not found on stack", depends_on,
)
return False
if operation_id == depends_on:
logger.warning(
"add_dependency: self-dependency %s ignored", operation_id,
)
return False
self._depends_on[operation_id].add(depends_on)
self._depended_by[depends_on].add(operation_id)
# Update the entry metadata
self._index[operation_id].depends_on.add(depends_on)
self._index[depends_on].depended_by.add(operation_id)
logger.debug(
"Dependency: %s → depends on → %s", operation_id, depends_on,
)
return True
# ------------------------------------------------------------------
# rollback_all() — undo all pending operations in LIFO order
# ------------------------------------------------------------------
def rollback_all(self) -> tuple[list[str], list[str]]:
"""
Execute all pending (uncommitted) undo handlers in LIFO order.
LIFO ordering ensures that dependents are undone before their
dependencies, maintaining referential integrity.
Returns:
(succeeded, failed) — lists of operation_ids.
succeeded: operations that were successfully rolled back.
failed: operations whose undo handlers returned False
or raised an exception. These need escalation.
"""
succeeded: list[str] = []
failed: list[str] = []
# Process in reverse order (LIFO)
for entry in reversed(list(self._stack)):
if entry.status == UndoStatus.COMMITTED:
continue
if entry.status == UndoStatus.ROLLED_BACK:
continue
success = self._execute_undo(entry)
if success:
succeeded.append(entry.operation_id)
else:
failed.append(entry.operation_id)
# Clear the stack (entries remain in index for audit)
self._stack.clear()
logger.info(
"RollbackStack rollback_all: %d succeeded, %d failed, "
"%d committed (skipped)",
len(succeeded), len(failed),
sum(1 for e in self._index.values() if e.status == UndoStatus.COMMITTED),
)
return succeeded, failed
# ------------------------------------------------------------------
# rollback_to() — partial rollback with dependency-aware boundary
# ------------------------------------------------------------------
def rollback_to(
self,
failed_operation_id: str,
*,
include_dependents: bool = True,
) -> tuple[list[str], list[str]]:
"""
Partially roll back the stack: undo the failed operation and,
optionally, all operations that depend on it.
Algorithm:
1. Compute the dependency closure: starting from the failed
operation, walk forward through `depended_by` to find all
operations that transitively depend on the failed one.
These form the "undo boundary."
2. Filter the undo boundary to only pending (uncommitted) entries.
3. Sort the boundary in LIFO order (most recent first) based on
their positions in the stack.
4. Execute undo handlers in that order.
Args:
failed_operation_id: The operation that triggered the rollback.
include_dependents: If True (default), also undo all operations
that depend on the failed one. If False,
only undo the failed operation itself.
Returns:
(succeeded, failed) — lists of operation_ids.
"""
entry = self._index.get(failed_operation_id)
if entry is None:
logger.error(
"rollback_to: unknown operation %s", failed_operation_id,
)
return [], [failed_operation_id]
if entry.status == UndoStatus.COMMITTED:
logger.warning(
"rollback_to: operation %s is already committed — "
"committed operations should not be rolled back via "
"rollback_to(); use rollback_all() for full recovery",
failed_operation_id,
)
return [], []
# --- Compute dependency closure (forward walk) -------------------
undo_set: set[str] = {failed_operation_id}
if include_dependents:
# BFS forward through depended_by edges
queue = deque([failed_operation_id])
while queue:
current = queue.popleft()
for dependent in self._depended_by.get(current, set()):
if dependent not in undo_set:
dep_entry = self._index.get(dependent)
if dep_entry and dep_entry.status == UndoStatus.PENDING:
undo_set.add(dependent)
queue.append(dependent)
# --- Filter to pending entries only ------------------------------
pending_undo = {
op_id for op_id in undo_set
if self._index[op_id].status == UndoStatus.PENDING
}
if not pending_undo:
logger.info(
"rollback_to: no pending operations in undo boundary for %s",
failed_operation_id,
)
return [], []
# --- Sort in LIFO order (most recent first) ---------------------
# Build a position map: operation_id → stack index (0 = oldest)
position: dict[str, int] = {}
for i, e in enumerate(self._stack):
position[e.operation_id] = i
sorted_undo = sorted(
pending_undo,
key=lambda op_id: position.get(op_id, -1),
reverse=True, # Highest position first = most recent first
)
logger.info(
"rollback_to: undo boundary for %s = %s (LIFO order: %s)",
failed_operation_id, sorted(pending_undo), sorted_undo,
)
# --- Execute undo handlers ---------------------------------------
succeeded: list[str] = []
failed: list[str] = []
for op_id in sorted_undo:
entry = self._index[op_id]
success = self._execute_undo(entry)
if success:
succeeded.append(op_id)
else:
failed.append(op_id)
logger.info(
"rollback_to: %d succeeded, %d failed for %s",
len(succeeded), len(failed), failed_operation_id,
)
return succeeded, failed
# ------------------------------------------------------------------
# rollback_one() — undo a single operation by ID
# ------------------------------------------------------------------
def rollback_one(self, operation_id: str) -> bool:
"""
Undo a single operation by its ID.
This does NOT handle dependencies — use rollback_to() for
dependency-aware partial rollback. This is a surgical tool
for operations known to be independent.
Returns True if the undo succeeded.
"""
entry = self._index.get(operation_id)
if entry is None:
logger.error(
"rollback_one: unknown operation %s", operation_id,
)
return False
if entry.status == UndoStatus.COMMITTED:
logger.warning(
"rollback_one: operation %s is committed — cannot undo",
operation_id,
)
return False
if entry.status == UndoStatus.ROLLED_BACK:
logger.info(
"rollback_one: operation %s already rolled back (idempotent)",
operation_id,
)
return True
return self._execute_undo(entry)
# ------------------------------------------------------------------
# Internal: execute a single undo handler
# ------------------------------------------------------------------
def _execute_undo(self, entry: StackEntry) -> bool:
"""
Execute the undo handler for a single stack entry.
Records the result in the rollback log regardless of success/failure.
Updates the entry status to ROLLED_BACK or FAILED.
"""
logger.info(
"Executing undo: op=%s desc=%r", entry.operation_id, entry.description,
)
log_entry = {
"operation_id": entry.operation_id,
"description": entry.description,
"timestamp": time.time(),
"success": False,
"error": None,
}
try:
result = entry.undo_handler()
if result:
entry.status = UndoStatus.ROLLED_BACK
log_entry["success"] = True
logger.info("Undo succeeded: %s", entry.operation_id)
else:
entry.status = UndoStatus.FAILED
log_entry["error"] = "undo_handler returned False"
logger.error("Undo FAILED: %s (handler returned False)", entry.operation_id)
except Exception as exc:
entry.status = UndoStatus.FAILED
log_entry["error"] = str(exc)
logger.exception(
"Undo FAILED with exception: %s — %s", entry.operation_id, exc,
)
result = False
self._rollback_log.append(log_entry)
# Remove from LIFO stack position (but keep in index for audit)
self._stack = [e for e in self._stack if e.operation_id != entry.operation_id]
return result
# ------------------------------------------------------------------
# Query / introspection
# ------------------------------------------------------------------
@property
def depth(self) -> int:
"""Number of entries on the LIFO stack (including committed)."""
return len(self._stack)
@property
def pending_count(self) -> int:
"""Number of uncommitted (rollback-eligible) entries."""
return sum(
1 for e in self._stack
if e.status == UndoStatus.PENDING
)
def get_entry(self, operation_id: str) -> Optional[StackEntry]:
"""Retrieve a stack entry by operation ID."""
return self._index.get(operation_id)
def get_dependents(self, operation_id: str) -> list[str]:
"""
Return the list of operation_ids that depend on `operation_id`.
This is the forward dependency closure (what would be affected
if this operation were rolled back).
"""
if operation_id not in self._index:
return []
# BFS forward
visited: set[str] = set()
queue = deque([operation_id])
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
for dep in self._depended_by.get(current, set()):
if dep not in visited:
queue.append(dep)
visited.discard(operation_id) # Exclude self
return sorted(visited)
def get_dependencies(self, operation_id: str) -> list[str]:
"""
Return the list of operation_ids that `operation_id` depends on.
This is the backward dependency closure.
"""
if operation_id not in self._index:
return []
# DFS backward
visited: set[str] = set()
stack = [operation_id]
while stack:
current = stack.pop()
if current in visited:
continue
visited.add(current)
stack.extend(self._depends_on.get(current, set()))
visited.discard(operation_id)
return sorted(visited)
def list_entries(self) -> list[dict[str, Any]]:
"""Return a human-readable list of all stack entries."""
return [
{
"operation_id": e.operation_id,
"description": e.description,
"status": e.status,
"depends_on": sorted(e.depends_on),
"depended_by": sorted(e.depended_by),
"registered_at": e.registered_at,
}
for e in self._stack
]
def list_failed_undos(self) -> list[dict[str, Any]]:
"""Return all failed rollback attempts from the execution log."""
return [entry for entry in self._rollback_log if not entry["success"]]
def summary(self) -> dict[str, Any]:
"""Return a summary of the rollback stack state for monitoring."""
return {
"stack_depth": self.depth,
"total_indexed": len(self._index),
"pending": self.pending_count,
"committed": sum(
1 for e in self._index.values()
if e.status == UndoStatus.COMMITTED
),
"rolled_back": sum(
1 for e in self._index.values()
if e.status == UndoStatus.ROLLED_BACK
),
"failed": sum(
1 for e in self._index.values()
if e.status == UndoStatus.FAILED
),
"dependency_edges": sum(
len(deps) for deps in self._depends_on.values()
),
"failed_undos_in_log": len(self.list_failed_undos()),
"entries": self.list_entries(),
}
# ========================================================================
# Usage example
# ========================================================================
if __name__ == "__main__":
# --- Setup: create a stack with simulated file operations ---
stack = RollbackStack()
# Simulated files for undo demonstration
files_state: dict[str, str] = {
"/etc/hosts": "127.0.0.1 localhost\n",
"/etc/app.conf": 'db_host = "localhost"\n',
"/etc/cache.conf": 'redis_host = "localhost"\n',
}
def make_undo_fn(filepath: str, old_content: str) -> Callable[[], bool]:
"""Factory for simulated file restore undo handlers."""
def undo() -> bool:
try:
files_state[filepath] = old_content
logger.info("Undo: restored %s", filepath)
return True
except Exception as exc:
logger.error("Undo failed for %s: %s", filepath, exc)
return False
return undo
# --- Step 1: Write /etc/hosts (DNS dependency) ---
old_hosts = files_state["/etc/hosts"]
files_state["/etc/hosts"] = "127.0.0.1 localhost\n10.0.0.5 db.internal\n"
stack.push(
"write-hosts",
make_undo_fn("/etc/hosts", old_hosts),
"Restore /etc/hosts (remove db.internal)",
metadata={"target": "/etc/hosts", "strategy": "snapshot"},
)
# --- Step 2: Write /etc/app.conf (depends on hosts) ---
old_app = files_state["/etc/app.conf"]
files_state["/etc/app.conf"] = 'db_host = "db.internal"\n'
stack.push(
"write-app-conf",
make_undo_fn("/etc/app.conf", old_app),
"Restore /etc/app.conf (revert db_host to localhost)",
metadata={"target": "/etc/app.conf", "strategy": "snapshot"},
)
stack.add_dependency("write-app-conf", depends_on="write-hosts")
# --- Step 3: Write /etc/cache.conf (independent) ---
old_cache = files_state["/etc/cache.conf"]
files_state["/etc/cache.conf"] = 'redis_host = "redis.internal"\n'
stack.push(
"write-cache-conf",
make_undo_fn("/etc/cache.conf", old_cache),
"Restore /etc/cache.conf (revert redis_host to localhost)",
metadata={"target": "/etc/cache.conf", "strategy": "snapshot"},
)
# cache.conf is independent — no dependency declared
print("=== Before rollback ===")
for f, content in files_state.items():
print(f" {f}: {content.strip()}")
# --- Simulate: write-app-conf failed → partial rollback ---
print("\n--- write-app-conf failed, triggering partial rollback ---")
succeeded, failed = stack.rollback_to("write-app-conf")
print(f"\nRollback result: succeeded={succeeded}, failed={failed}")
print("\n=== After partial rollback ===")
for f, content in files_state.items():
print(f" {f}: {content.strip()}")
# /etc/hosts should be restored (dependency of app-conf)
# /etc/app.conf should be restored (the failed operation itself)
# /etc/cache.conf should remain modified (independent, not rolled back)
assert files_state["/etc/hosts"] == "127.0.0.1 localhost\n", (
"Hosts should have been rolled back (dependency)"
)
assert files_state["/etc/app.conf"] == 'db_host = "localhost"\n', (
"App conf should have been rolled back (the failed op)"
)
assert files_state["/etc/cache.conf"] == 'redis_host = "redis.internal"\n', (
"Cache conf should NOT have been rolled back (independent)"
)
print("\n✓ All assertions passed: partial rollback preserved independent work")
# --- Demonstrate full rollback_all ---
stack2 = RollbackStack()
for i, (path, content) in enumerate([
("/etc/a.conf", "a_v1"),
("/etc/b.conf", "b_v1"),
("/etc/c.conf", "c_v1"),
]):
old = "original"
files_state[path] = content
def _make_undo(p: str, o: str) -> Callable[[], bool]:
def _undo() -> bool:
files_state[p] = o
return True
return _undo
stack2.push(f"write-{i}", _make_undo(path, old), f"Restore {path}")
print("\n--- rollback_all (full LIFO undo) ---")
s, f = stack2.rollback_all()
print(f"Succeeded: {s}, Failed: {f}")
for path in files_state:
if path.startswith("/etc/"):
assert files_state[path] == "original", f"{path} not restored"
print("✓ Full rollback_all restored all files")
# Summary
import json as _json
print(f"\nStack 1 summary:\n{_json.dumps(stack.summary(), indent=2)}")
The RollbackStack is the connective tissue of the entire rollback architecture. It bridges the strategy selection of RollbackOrchestrator (Section 2), the file-level mechanics of CopyOnWriteAgent (Section 3), the data-level primitives of WriteAheadLog and CheckpointManager (Section 4), and the environment-level safety net of ContainerEnvironment (Section 5). Every undo handler registered in the system — whether it restores a single file from a diff, rolls back a WAL transaction, re-applies a Terraform state, or destroys and recreates a container — lives on this stack. The stack's LIFO ordering, dependency tracking, and partial rollback capability are what transform a collection of independent rollback primitives into a composed, sequenced recovery system.
The rollback architecture presented across these six sections converges on a single design principle stated at the beginning: every agent write operation must register an executable undo handler before the write proceeds. From a single-file context manager to a container-level snapshot, from a database WAL entry to a Terraform state checkpoint, and from a LIFO stack push to a dependency-aware partial rollback — every component exists to enforce this one invariant. When the invariant holds, agent failures become reversible. When it doesn't, they become production incidents. For the state machine framework that gates agent writes against this invariant at every decision boundary, see Agent State Machine Design. For the error recovery pipeline that consumes the rollback stack's output (failed undos, partial recovery results) and escalates appropriately, see Agent Error Recovery.
7. Verification Before Finalization — Commit-and-Validate Patterns
Registering an undo handler before writes execute is necessary but not sufficient. The system must also verify that each write is correct before considering it final — otherwise the rollback stack fills with perfectly reversible but 100% incorrect writes, and the agent "succeeds" by every operational metric while silently corrupting production state.
This section describes commit-and-validate patterns: the set of gates a write must pass through between the undo registration (covered in Sections 1–6) and the final commit() call. The pattern is universal across all strategies — snapshot, transaction, and compensation — and the sequence is always the same: stage → validate → finalize.
Two-Phase Commit for Agent Writes: Stage, Then Validate, Then Finalize
The two-phase pattern is deceptively simple but easy to skip under schedule pressure. The stages are:
- Phase 1 — Stage: Prepare the write in a staging area. This is where the undo handler is registered, the snapshot is taken, the WAL intent entry is written, or the compensation function is defined. At the end of Phase 1, the write is prepared but not applied. The system can still abort with zero side effects because the target is untouched.
- Phase 2 — Validate: Run every relevant check against the staged content. Syntax validators, schema checkers, dry-run commands, integration tests, and policy evaluators all execute in this phase. If any check fails, Phase 1's undo handler runs, and the target is never touched.
- Finalize: Only after Phase 2 passes every gate does the atomic replace, database commit, or API call execute. This is the single point of no return — even here, the undo handler remains on the stack until
commit()is called, providing a post-finalization safety window.
This is not a hypothetical: the safe_agent_write context manager from Section 1 already implements this exact pipeline — snapshot → stage → validate → atomic replace. The CopyOnWriteAgent.write() from Section 3 adds diff computation between snapshot and stage. The RollbackOrchestrator.prepare_undo() from Section 2 is Phase 1. Every component in this article converges on this same three-phase pattern.
Canary Writes and Validation Criteria
A canary write applies the staged content to a single target first, validates it in isolation, and only then proceeds to the remaining targets. This is the write equivalent of a canary deployment:
# Pattern: canary write for multi-server config changes
# 1. Stage the new config
# 2. Write to server-01 ONLY
# 3. Validate server-01 (health check, functional test)
# 4. If validation passes: write to servers 02..N
# 5. If validation fails on server-01: rollback server-01, abort the batch
Canary writes require the system to define validation criteria — the exact conditions that distinguish a successful write from a corrupt one. These criteria fall into four categories:
- Syntactic: Does the file parse correctly? (YAML/JSON/TOML syntax check,
nginx -t,python -m py_compile) - Semantic: Does the content make sense in context? (port number within valid range, file path exists, database column type matches the value)
- Functional: Does the system actually work after the change? (health check endpoint returns 200, service responds to a test request, database query returns expected rows)
- Policy: Does the write comply with organizational rules? (no credentials in plaintext, no wildcard IAM policies, file permissions within allowed umask)
Validation criteria are passed into the write pipeline as callable validators — the same pattern used by safe_agent_write(validator=...) in Section 1 and CopyOnWriteAgent.write(validator=...) in Section 3. The system treats a validator failure identically to a write failure: the undo handler runs, the staging artifacts are cleaned up, and the target is untouched.
Rollback-on-Failure: The Default, Not the Exception
In the commit-and-validate pattern, rollback is the default path. A write only becomes permanent when it passes every gate and commit() is called. If the agent process crashes between Phase 2 and commit(), the system treats that as a failure and rolls back. This is the correct design — it is far safer to redo a write that was actually correct than to leave a write that was actually wrong.
For error recovery patterns that operate at this boundary — what happens when rollback itself fails during the validation phase — see Agent Error Recovery. The error recovery pipeline receives the rollback_all() output (including the list of failed undo operations) and decides whether to retry, escalate to a human operator, or trigger a full environment-level reset. Rollback is the first line of defense; error recovery is the second.
Human-in-the-Loop Gates
Not every write should be autonomously finalized. For high-risk operations — database schema changes, infrastructure provisioning, credential rotation — the commit-and-validate pattern supports a human-in-the-loop gate between Phase 2 (validate) and finalization:
- Approval gate: The staged content and validation results are presented to a human operator, who explicitly approves or rejects the finalization. The system blocks indefinitely on this gate — if the operator never responds, the write never executes.
- Time-bounded approval: Same as above, but with a timeout. If the operator doesn't respond within N minutes, the system auto-rejects and rolls back. This prevents orphaned pending writes from accumulating.
- Threshold-based auto-approval: Low-risk writes (file size < 10KB, only syntax validation, no external side effects) auto-finalize. Medium-risk writes (multi-file, database mutation) require a time-bounded approval. High-risk writes (API calls, schema changes, infra provisioning) require explicit approval.
Human-in-the-loop gates integrate with the rollback stack: if approval is denied or times out, the rollback() call on that operation executes exactly as it would for any other failure. The undo handler was registered in Phase 1; it doesn't care whether the trigger was a syntax error or a human rejection. For the command approval policy framework that governs which agent operations require explicit human approval, see Agent Command Execution Safety.
8. Limitations, Trade-Offs, and When Rollback Cannot Save You
The rollback architecture described in this article covers a broad surface area — files, databases, containers, and even external APIs through compensation. But it is not universal. There are failure modes for which no amount of undo handler registration, snapshot storage, or compensation programming can provide a clean recovery. This section enumerates those boundaries explicitly, so that when you encounter them in production, you recognize them before they become incidents.
External Side Effects: API Calls and Database Mutations Outside Your Control
The most fundamental limitation: rollback can only undo writes within your span of control. When an agent calls an external API that creates a resource, that resource lives in a system you do not own:
- Irreversible APIs: Some APIs have no delete or undo operation. Creating a DNS record, sending an SMS, or provisioning a hardware resource may be a one-way door. The compensation strategy (Section 2) handles reversible APIs — but for irreversible ones, the best you can do is log the operation, alert a human, and accept that the side effect has occurred.
- Eventual consistency: Even when an API supports undo, the compensating operation may not take effect immediately. A "cancel subscription" call might return 200 OK while the subscription remains active for 30 seconds. During that window, dependent systems may observe and act on the now-cancelled-but-still-active state.
- Idempotency gaps: Compensation assumes that calling the reverse operation once undoes the forward operation. But if the external API isn't strictly idempotent, a retry of the compensation call could create a second side effect (double refund, duplicate DNS deletion error).
The design implication: the compensation strategy should be your last-choice rollback mechanism, not your first. Before an agent calls an external API, prefer snapshot or transaction strategies for the local state leading up to the API call. If the API call must be rolled back, the local state restoration is handled by the reliable strategies, and the compensation is handled separately with explicit acknowledgment that it's best-effort.
Distributed Writes Across Agents
The rollback stack (Section 6) provides composable LIFO undo within a single agent's execution scope. But in production, multiple agents may operate concurrently on shared state:
- Agent A writes file F1, registers undo handler.
- Agent B reads F1 (now modified by Agent A), writes file F2 based on that content, registers its own undo handler.
- Agent A detects a problem and rolls back F1.
- Agent B's F2 is now based on a precondition (F1's content) that no longer exists. Agent B's undo handler, registered in isolation, cannot account for Agent A's rollback.
This is the cross-agent write dependency problem. The rollback stack solves it within a single agent's scope by enforcing LIFO ordering. Across agents, the problem becomes a distributed transaction management challenge — requiring coordination protocols (two-phase commit across agents), distributed locks, or a global write sequencer. None of these are trivial, and all introduce latency and failure modes of their own.
Practical mitigation strategies:
- Partition state by agent responsibility: Ensure no two agents write to the same file, database table, or configuration namespace. If Agent A owns
/etc/nginx/and Agent B owns/etc/envoy/, cross-agent write conflicts cannot occur. - Sequential agent execution: Run agents serially rather than concurrently. Agent B only starts after Agent A has fully committed. This eliminates the concurrency problem at the cost of throughput.
- Global write log: All agent writes go through a centralized write coordinator that serializes and orders them. The rollback stack then operates on the global write order, not the per-agent order. See Agent Audit Log Design for the foundation of such a log.
Time-Bounded Rollback and Retention Costs
Rollback capability decays over time. The longer you wait after a write, the harder it becomes to undo cleanly:
- Dependency accumulation: The longer a bad write sits in production, the more downstream systems read and depend on it. Rolling back a database schema change made 24 hours ago may be technically possible, but 14 microservices have since read the new schema and cached query plans based on it. The rollback restores the database — it does not invalidate 14 service caches.
- Snapshot rot: Snapshots and reverse diffs consume disk space. Retention policies (Section 3) cap storage at a configurable maximum. After the retention window expires, the snapshot is deleted — and rollback is no longer possible. This is a deliberate trade-off: you can have infinite rollback capability or finite storage costs, but not both.
- Compounding compensation: If an agent's API call created a resource, and 12 hours of other operations have built on top of that resource, cancelling the resource may cascade-fail all 12 hours of dependent work. Compensation is a point-in-time undo — it does not rewind all dependent history.
The rule of thumb: rollback within 1 hour, or accept that the rollback will be partial. After 1 hour, the probability that the system has accumulated irreversible dependencies on the bad write exceeds 50%. The retention policy defaults in CopyOnWriteAgent (Section 3) reflect this: committed snapshots are kept for 1 hour by default, then cleaned up.
Meta-Rollback: What If Rollback Itself Fails?
The rollback_all() method on both RollbackOrchestrator and RollbackStack returns a list of failed undo operations — operations that could not be rolled back. This list is the system's admission that the safety net has a hole. When a rollback fails, the system faces a meta-problem: the mechanism designed to recover from failure has itself failed.
Meta-rollback failure scenarios:
- Snapshot corruption: The snapshot file was truncated by a disk error. The
copy2call succeeds but restores a partial file. - Diff application failure: The reverse diff references line numbers that no longer exist in the current file (because another process modified it between the write and the rollback).
- Compensation failure: The external API returns 500 on the cancel request. The resource remains active with no further recourse.
- Disk full: The snapshot restoration requires copying a 500MB file, but the disk has 10MB free. The copy fails with ENOSPC, and the target file is left in an indeterminate state.
When meta-rollback occurs, the system has three escalation paths:
- Environment-level reset: If rollback fails at the file or data level, escalate to the environment level. Destroy the container or VM, recreate it from a known-good image, and re-apply only committed writes. See Section 5 for the
ContainerEnvironmentimplementation. - Human intervention: The failed undo operations are logged at ERROR level with full context (operation_id, target, strategy, failure reason). An alert fires. A human operator decides whether to manually restore state, accept the partial rollback, or trigger a full disaster recovery procedure.
- Accept partial recovery: For low-criticality operations, the system can log the failure and continue. The corrupt state is documented, and the system operates in a degraded mode. This is only acceptable when the corrupt state does not affect core functionality — a corrupted log file that only affects debugging output is acceptable; a corrupted database config is not.
The error recovery pipeline handles the escalation logic. For the comprehensive treatment of how to design an error recovery system that consumes rollback failures, classifies them by severity, and triggers the appropriate escalation path, see Agent Error Recovery. For observability infrastructure that surfaces rollback failures in dashboards and alerting systems, see Agent Observability.
The existence of these limitations does not invalidate the rollback architecture. It bounds it. Every automated system has failure modes beyond its design envelope. The engineering task is to make the envelope as wide as practical, to make the failure modes within the envelope recoverable, and to make the failure modes outside the envelope visible — so that humans know when they've crossed the line.
Frequently Asked Questions
1. How is rollback different from version control (git)?
Version control tracks committed snapshots of files in a repository at discrete points in time. Rollback tracks every individual write operation in real time, across files both inside and outside repositories. Git can restore a file to the last commit — but if an agent has modified 47 files between commits, only rollback can undo just the 3 bad writes while preserving the 44 good ones. Git is a coarse-grained restore-to-known-good tool. Rollback is a fine-grained, per-operation undo tool. They complement each other, but neither replaces the other. For a full discussion, see Section 1 — Why Version Control and Retries Are Not Enough.
2. What is the cost of maintaining snapshots? When should I expire them?
Snapshot storage cost is O(n × filesize) for full snapshots, or O(n × diffsize) for diff-based snapshots (see Section 3 — CopyOnWriteAgent). For a 10KB config file modified 50 times with single-line changes, diff-based storage consumes ~5KB total versus ~500KB for full snapshots. Retention policies should follow a three-tier model: uncommitted writes are kept indefinitely (they're your active safety net); committed writes expire after 1 hour (the window where rollback is still practically useful); and audit metadata is kept for 7 days for traceability. The RetentionPolicy configuration in Section 3 provides tunable knobs for each tier.
3. Can I combine snapshot and transaction strategies for the same operation?
Yes — and for many production scenarios, you should. The RollbackOrchestrator in Section 2 already implements strategy degradation with fallbacks: if the primary strategy fails to prepare (e.g., snapshot fails due to insufficient disk space), it automatically falls back to the transaction strategy. You can also use them in combination deliberately: snapshot the large binary file (where diffs are useless), use WAL transaction for the small structured config files (where key-level deltas are efficient), and add compensation for any external API calls. The RollbackStack in Section 6 composes undo handlers from all three strategies into a single ordered recovery sequence.
4. How do I rollback an API call an agent made?
You can't truly "roll back" an external API call — you can only compensate for it. The compensation strategy (Section 2 — Strategy 3) requires you to write a dedicated compensation function that performs the semantically equivalent reverse operation: cancel the subscription, delete the DNS record, refund the charge. This is not true rollback — the system does not return to the exact same state (the API provider may have logged the creation, sent notification emails, or triggered webhooks). The compensation function itself can fail, so it should always be treated as best-effort. For irreversible APIs that have no delete operation, the only option is to log the call, alert a human, and accept the side effect. See Section 8 — External Side Effects for the full set of limitations.
5. What if my rollback itself fails — do I need meta-rollback?
Yes — this is the meta-rollback problem detailed in Section 8 — Meta-Rollback. When a rollback operation fails (snapshot corruption, diff application error, disk full), the system has three escalation paths: (1) environment-level reset — destroy and recreate the container/VM from a known-good image; (2) human intervention — alert an operator with full context of the failed undo; (3) accept partial recovery — for low-criticality operations, log the failure and operate in degraded mode. The rollback_all() methods on both RollbackOrchestrator and RollbackStack explicitly return the list of failed operation IDs so the error recovery pipeline knows exactly what couldn't be undone. For how to design that pipeline, see Agent Error Recovery.
6. How many checkpoints should I keep? What are the rules of thumb for retention policies?
The right number depends on write frequency and recovery time objectives. Rules of thumb: per file: keep the last 50 versions (max_versions_per_file in Section 3's RetentionPolicy) — beyond 50 versions, the probability that you'll need to roll back to version 51 is negligible compared to the storage cost. Per agent session: keep all uncommitted checkpoints until the session ends; then keep the last 3 committed checkpoints for post-session debugging. Total storage cap: set a hard ceiling (e.g., 500MB) and eagerly purge the oldest committed snapshots when it's exceeded. Time-based: committed snapshots expire after 1 hour (rollback utility decays rapidly after that window); audit metadata is kept for 7 days for post-incident analysis. For the WAL checkpoint model specifically, see Section 4 — CheckpointManager; for the file-level retention implementation, see Section 3 — Staging Directory Management and Retention.
Continue Reading
- Agent Release Gate Design — Rollback is the safety net for release gates; learn how to gate agent output before it reaches production.
- Agent Error Recovery — The escalation pipeline that consumes rollback failures and decides whether to retry, reset, or alert a human.
- Agent Audit Log Design — The WAL and snapshot metadata systems described here are only useful if you can query them; audit logs provide that query surface.
- Agent State Machine Design — The deterministic shell that gates agent writes at every decision boundary, making rollback predictable at the transition level.
- Agent Command Execution Safety — The blast-radius containment layer that limits how much damage an agent can do before rollback is even needed.
- Agent Runtime Isolation — Container and sandbox isolation expands rollback coverage to the entire runtime environment.
- Agent Observability — Every rollback event, whether successful or failed, must be observable; this article covers the metrics and logging infrastructure.