Claude Multi-Agent Orchestration: Patterns for Complex Workflows
Multi-agent systems use multiple Claude instances working together — an orchestrator agent that plans and delegates, and subagent specialists that execute specific tasks. The key design principle: subagents should be narrow and reliable; orchestrators should be broad and strategic. A web research agent, a data analysis agent, and a report-writing agent each do one thing well. An orchestrator decides which to use, in what order, and how to combine their outputs. This guide covers the core patterns.
When to use multi-agent vs single-agent
Single agent (simpler, default): when the task fits in one context window, doesn't benefit from specialisation, and doesn't need parallel execution.
Multi-agent (more complex, when justified):
- Task is too long for one context window
- Sub-tasks benefit from specialised agents with distinct system prompts
- Sub-tasks can execute in parallel (faster completion)
- Reliability improves when agents cross-check each other's work
Don't use multi-agent systems for simple tasks — the orchestration overhead adds latency and cost.
Pattern 1: Sequential pipeline
Tasks depend on each other; execute in order:
import anthropic
from dataclasses import dataclass
from typing import Callable
client = anthropic.Anthropic()
@dataclass
class AgentConfig:
name: str
system_prompt: str
model: str = "claude-sonnet-4-5"
max_tokens: int = 4096
def run_agent(config: AgentConfig, input_text: str) -> str:
"""Run a single agent with the given input."""
response = client.messages.create(
model=config.model,
max_tokens=config.max_tokens,
system=config.system_prompt,
messages=[{"role": "user", "content": input_text}]
)
return response.content[0].text
def run_pipeline(stages: list[tuple[AgentConfig, str]], initial_input: str) -> str:
"""
Run agents in sequence, each receiving the previous agent's output.
stages: list of (agent_config, task_description) tuples
"""
current_output = initial_input
for stage_idx, (config, task) in enumerate(stages):
print(f"Running stage {stage_idx + 1}: {config.name}")
prompt = f"{task}\n\nInput:\n{current_output}"
current_output = run_agent(config, prompt)
return current_output
# Example: research → analyse → report pipeline
research_agent = AgentConfig(
name="Researcher",
system_prompt="You are a thorough researcher. Extract and summarise all relevant information from the provided content.",
)
analysis_agent = AgentConfig(
name="Analyst",
system_prompt="You are a data analyst. Identify patterns, draw conclusions, and quantify insights from research summaries.",
)
writer_agent = AgentConfig(
name="Writer",
system_prompt="You are a technical writer. Transform analysis into clear, structured reports with actionable recommendations.",
)
result = run_pipeline(
stages=[
(research_agent, "Research and summarise the key points from this content:"),
(analysis_agent, "Analyse this research summary and identify the 3 most significant insights:"),
(writer_agent, "Write a concise executive report from this analysis:"),
],
initial_input=source_document,
)
Pattern 2: Parallel execution
Sub-tasks are independent; execute concurrently:
import asyncio
async def run_agent_async(
config: AgentConfig,
input_text: str,
client: anthropic.AsyncAnthropic,
) -> str:
response = await client.messages.create(
model=config.model,
max_tokens=config.max_tokens,
system=config.system_prompt,
messages=[{"role": "user", "content": input_text}]
)
return response.content[0].text
async def run_parallel(
tasks: list[tuple[AgentConfig, str]],
client: anthropic.AsyncAnthropic,
) -> list[str]:
"""
Run multiple agents in parallel.
Returns list of outputs in the same order as input tasks.
"""
coroutines = [
run_agent_async(config, task, client)
for config, task in tasks
]
return await asyncio.gather(*coroutines)
# Example: analyze a document from multiple angles simultaneously
async def multi_perspective_analysis(document: str) -> dict:
async_client = anthropic.AsyncAnthropic()
tasks = [
(
AgentConfig("Technical Reviewer", "You are a technical expert. Identify technical strengths and weaknesses."),
f"Review this from a technical perspective:\n\n{document}"
),
(
AgentConfig("Business Reviewer", "You are a business analyst. Identify business opportunities and risks."),
f"Review this from a business perspective:\n\n{document}"
),
(
AgentConfig("User Advocate", "You are a user experience expert. Identify usability considerations."),
f"Review this from a user perspective:\n\n{document}"
),
]
results = await run_parallel(tasks, async_client)
return {
"technical": results[0],
"business": results[1],
"user": results[2],
}
reviews = asyncio.run(multi_perspective_analysis(document))
Pattern 3: Orchestrator with dynamic routing
An orchestrator agent decides which subagents to invoke:
ORCHESTRATOR_PROMPT = """You are an orchestrator agent. You break complex tasks into subtasks
and delegate them to specialist agents.
Available agents:
- research_agent: searches and summarises information (use for: finding facts, gathering background)
- code_agent: writes and debugs code (use for: implementation tasks, code reviews)
- analysis_agent: analyses data and identifies patterns (use for: data interpretation, comparisons)
- writer_agent: produces polished written content (use for: final reports, summaries)
For each task, output a plan as JSON:
{
"plan": [
{"agent": "agent_name", "task": "specific instruction", "depends_on": [] or [step_index]},
...
]
}
Then execute the plan step by step, passing each agent's output to dependent steps."""
def run_orchestrated_task(user_request: str) -> str:
"""
Let an orchestrator agent break down and execute a complex task.
"""
messages = [{"role": "user", "content": user_request}]
# Orchestrator creates a plan
plan_response = client.messages.create(
model="claude-opus-4-0", # Use Opus for orchestration (strategic thinking)
max_tokens=2048,
system=ORCHESTRATOR_PROMPT,
messages=messages,
)
plan_text = plan_response.content[0].text
# Parse plan and execute... (implement based on your agent set)
return execute_plan(plan_text, user_request)
Pattern 4: Verification agent (double-checking)
Use a second agent to validate the first agent's output:
def generate_and_verify(task: str, input_data: str) -> tuple[str, bool, str]:
"""
Generate output, then have a verifier check it.
Returns (output, is_verified, verification_notes).
"""
# Step 1: Generate
generator_config = AgentConfig(
"Generator",
"Complete the assigned task accurately and thoroughly."
)
output = run_agent(generator_config, f"{task}\n\nData:\n{input_data}")
# Step 2: Verify
verifier_config = AgentConfig(
"Verifier",
"""You verify that outputs are accurate and complete.
Respond with:
VERIFIED: [YES/NO]
ISSUES: [list any errors, omissions, or concerns]
CONFIDENCE: [HIGH/MEDIUM/LOW]"""
)
verification = run_agent(
verifier_config,
f"""Task that was performed: {task}
Output to verify:
{output}
Original data:
{input_data}
Is this output accurate and complete?"""
)
is_verified = "VERIFIED: YES" in verification
return output, is_verified, verification
output, verified, notes = generate_and_verify(
"Extract all pricing tiers and their features",
pricing_page_content
)
if not verified:
print(f"Verification failed: {notes}")
Managing costs in multi-agent systems
Multi-agent systems multiply token costs. Cost management:
from dataclasses import field
@dataclass
class AgentConfig:
name: str
system_prompt: str
model: str = "claude-haiku-4-5" # Default to cheapest model
max_tokens: int = 2048
cost_limit_usd: float = 0.10 # Stop if this agent exceeds cost limit
def estimate_cost(input_tokens: int, output_tokens: int, model: str) -> float:
"""Estimate cost in USD for a model call."""
rates = {
"claude-haiku-4-5": (0.80, 4.00),
"claude-sonnet-4-5": (3.00, 15.00),
"claude-opus-4-0": (15.00, 75.00),
}
input_rate, output_rate = rates.get(model, (3.00, 15.00))
return (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000
Rule of thumb: use Haiku for subagents, Sonnet for complex subagents, and Opus only for the orchestrator.
Frequently asked questions
How many agents is too many? More than 5–7 agents in a single workflow is a sign the task is poorly decomposed. Each additional agent adds latency and cost. Most production multi-agent systems have 2–4 agents.
Should agents communicate via shared state or message passing? Message passing (each agent receives the previous agent's output as input) is simpler and more reliable. Shared state (a database both agents read/write) is appropriate when agents need to run truly independently without sequential dependency.
Can agents call other agents directly? Yes, but this creates complex dependency graphs. For clarity, prefer a single orchestrator that manages all agent calls rather than agents calling each other directly.
How do I debug a multi-agent system? Log every agent's input and output with timestamps. The most common failure modes: an early agent produces bad output that propagates through the chain, or context is passed incorrectly between agents. Intermediate output logging immediately reveals where the failure happened.
What happens if an agent in the middle of the pipeline fails? Implement retry logic at each stage. For irreversible pipelines, checkpoint results after each successful stage so you can resume from the failure point rather than starting over.
Related guides
- Claude Agent SDK: Build Your First Agent in 30 Minutes — single-agent fundamentals
- Deploying Claude Agents to Production: Fly.io, Vercel, and Lambda — deploying orchestrated agent systems
Take It Further
Claude Agent SDK Cookbook: 40 Production Patterns — Patterns 25–32 cover Multi-Agent Systems: the orchestrator template, parallel execution with asyncio, verification loops, cost budgeting, inter-agent communication, and the complete research-analyse-write pipeline with production error handling.
→ Get the Agent SDK Cookbook — $49
30-day money-back guarantee. Instant download.