A single Agent can do a lot. But truly complex scenarios — like a project that needs simultaneous code review, security audit, and documentation — quickly overwhelm one Agent's context window and attention.
That's what multi-agent orchestration solves. Not making one Agent stronger, but making multiple Agents each do what they do best.
| Problem | Single Agent | Multi-Agent |
|---|---|---|
| Context window | One window for everything | Independent windows per Agent |
| Depth of expertise | Jack of all trades | Each Agent specializes |
| Parallelism | Sequential only | Multiple Agents work simultaneously |
| Fault tolerance | One failure = total failure | Individual Agent failure is isolated |
Agent A's output feeds into Agent B. Like a factory assembly line.
Typical use case: Code generation → Code review → Security scan → Documentation.
def sequential_pipeline(task: str) -> str:
# Agent 1: Generate code
code = agent_coder.run(f"Implement: {task}")
# Agent 2: Review code
review = agent_reviewer.run(f"Review this code:\n{code}")
if "needs changes" in review.lower():
code = agent_coder.run(f"Fix based on feedback:\n{review}\nCode:\n{code}")
# Agent 3: Security scan
security = agent_security.run(f"Scan for vulnerabilities:\n{code}")
# Agent 4: Generate docs
docs = agent_writer.run(f"Write documentation for:\n{code}")
return {"code": code, "review": review,
"security": security, "docs": docs}
Multiple Agents tackle different sub-tasks simultaneously, then aggregate.
Typical use case: Market analysis — Agent A does technicals, Agent B fundamentals, Agent C sentiment, then consolidate.
import concurrent.futures
def parallel_orchestration(market: str) -> dict:
tasks = {
"technical": f"Analyze {market} technical indicators (MACD, RSI, MAs)",
"fundamental": f"Analyze {market} fundamentals (valuation, earnings, growth)",
"sentiment": f"Analyze {market} sentiment and news",
"flow": f"Analyze {market} capital flows and positioning"
}
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
name: executor.submit(agent_analyst.run, prompt)
for name, prompt in tasks.items()
}
results = {
name: future.result()
for name, future in futures.items()
}
# Summarizer Agent consolidates all analyses
summary = agent_summarizer.run(
f"Synthesize the following analyses into a conclusion:\n" +
"\n".join([f"## {k}\n{v}" for k, v in results.items()])
)
return {"analysis": results, "summary": summary}
MCP (Model Context Protocol) is an open protocol from Anthropic that solves a key problem: how to standardize tool and context sharing between Agents.
The old problem: Agent A's tools can't be used by Agent B. Agent B's context can't be read by Agent C. Every Agent is an information silo.
MCP's three core concepts:
# mcp_config.yaml — Agent framework configuration
servers:
filesystem:
command: "npx"
args: ["-y", "@anthropic/mcp-server-filesystem", "/workspace"]
github:
command: "npx"
args: ["-y", "@anthropic/mcp-server-github"]
env:
GITHUB_TOKEN: "${GITHUB_TOKEN}"
database:
url: "https://db-mcp.internal/mcp"
transport: "http"
Once configured, the Agent automatically gets all the tools these Servers provide — file search, PR reading, database querying — no per-tool integration code needed.
Agents aren't classifiers — you can't measure them with accuracy. You need multi-dimensional evaluation:
| Dimension | How to Measure | Metric |
|---|---|---|
| Task completion | Give 100 standard tasks, count completions | Completion rate % |
| Tool call accuracy | Verify each tool call is correct | Correct calls / total |
| Efficiency | Turns and tokens used per task | Avg turns, token cost |
| Self-healing rate | Percentage of errors recovered autonomously | Recoveries / errors |
| Security | Inject malicious prompts, check if blocked | Block rate % |
def evaluate_agent(agent, test_suite: list[dict]) -> dict:
"""Basic Agent evaluation framework."""
results = {"passed": 0, "failed": 0, "details": []}
for case in test_suite:
try:
output = agent.run(case["input"])
# Use another Agent or rules to judge pass/fail
verdict = judge_agent.run(
f"Task: {case['input']}\n"
f"Expected: {case['expected']}\n"
f"Actual: {output}\n"
f"Judge if this passes (reply PASS or FAIL)"
)
passed = "PASS" in verdict.upper()
results["passed" if passed else "failed"] += 1
results["details"].append({
"task": case["name"],
"passed": passed,
"verdict": verdict
})
except Exception as e:
results["failed"] += 1
results["details"].append({
"task": case["name"], "passed": False, "error": str(e)
})
return results
| Layer | What to Do |
|---|---|
| Sandboxing | Agent code execution MUST run in Docker/VM sandboxes, never on bare host |
| Rate limiting | Set per-user/per-Agent API call caps to prevent runaway costs |
| Audit logging | Log every Agent's tool calls, parameters, and results for post-hoc analysis |
| Observability | Real-time monitoring of Agent status, token consumption, error rates |
| Fallback | When the Agent can't handle it, gracefully degrade to human review |
📖 Next: Building an Agent Framework from Scratch — verifiable execution traces and sandbox security