← All guides

Claude Multi-Agent Orchestration: Patterns for Complex Workflows

How to build multi-agent systems with Claude — orchestrator/subagent patterns, parallel execution, inter-agent communication, and production reliability.

Claude Multi-Agent Orchestration: Patterns for Complex Workflows

Multi-agent systems use multiple Claude instances working together — an orchestrator agent that plans and delegates, and subagent specialists that execute specific tasks. The key design principle: subagents should be narrow and reliable; orchestrators should be broad and strategic. A web research agent, a data analysis agent, and a report-writing agent each do one thing well. An orchestrator decides which to use, in what order, and how to combine their outputs. This guide covers the core patterns.


When to use multi-agent vs single-agent

Single agent (simpler, default): when the task fits in one context window, doesn't benefit from specialisation, and doesn't need parallel execution.

Multi-agent (more complex, when justified):

Don't use multi-agent systems for simple tasks — the orchestration overhead adds latency and cost.


Pattern 1: Sequential pipeline

Tasks depend on each other; execute in order:

import anthropic
from dataclasses import dataclass
from typing import Callable

client = anthropic.Anthropic()

@dataclass
class AgentConfig:
    name: str
    system_prompt: str
    model: str = "claude-sonnet-4-5"
    max_tokens: int = 4096

def run_agent(config: AgentConfig, input_text: str) -> str:
    """Run a single agent with the given input."""
    response = client.messages.create(
        model=config.model,
        max_tokens=config.max_tokens,
        system=config.system_prompt,
        messages=[{"role": "user", "content": input_text}]
    )
    return response.content[0].text

def run_pipeline(stages: list[tuple[AgentConfig, str]], initial_input: str) -> str:
    """
    Run agents in sequence, each receiving the previous agent's output.
    stages: list of (agent_config, task_description) tuples
    """
    current_output = initial_input
    
    for stage_idx, (config, task) in enumerate(stages):
        print(f"Running stage {stage_idx + 1}: {config.name}")
        prompt = f"{task}\n\nInput:\n{current_output}"
        current_output = run_agent(config, prompt)
    
    return current_output

# Example: research → analyse → report pipeline
research_agent = AgentConfig(
    name="Researcher",
    system_prompt="You are a thorough researcher. Extract and summarise all relevant information from the provided content.",
)
analysis_agent = AgentConfig(
    name="Analyst",
    system_prompt="You are a data analyst. Identify patterns, draw conclusions, and quantify insights from research summaries.",
)
writer_agent = AgentConfig(
    name="Writer",
    system_prompt="You are a technical writer. Transform analysis into clear, structured reports with actionable recommendations.",
)

result = run_pipeline(
    stages=[
        (research_agent, "Research and summarise the key points from this content:"),
        (analysis_agent, "Analyse this research summary and identify the 3 most significant insights:"),
        (writer_agent, "Write a concise executive report from this analysis:"),
    ],
    initial_input=source_document,
)

Pattern 2: Parallel execution

Sub-tasks are independent; execute concurrently:

import asyncio

async def run_agent_async(
    config: AgentConfig,
    input_text: str,
    client: anthropic.AsyncAnthropic,
) -> str:
    response = await client.messages.create(
        model=config.model,
        max_tokens=config.max_tokens,
        system=config.system_prompt,
        messages=[{"role": "user", "content": input_text}]
    )
    return response.content[0].text

async def run_parallel(
    tasks: list[tuple[AgentConfig, str]],
    client: anthropic.AsyncAnthropic,
) -> list[str]:
    """
    Run multiple agents in parallel.
    Returns list of outputs in the same order as input tasks.
    """
    coroutines = [
        run_agent_async(config, task, client)
        for config, task in tasks
    ]
    return await asyncio.gather(*coroutines)

# Example: analyze a document from multiple angles simultaneously
async def multi_perspective_analysis(document: str) -> dict:
    async_client = anthropic.AsyncAnthropic()
    
    tasks = [
        (
            AgentConfig("Technical Reviewer", "You are a technical expert. Identify technical strengths and weaknesses."),
            f"Review this from a technical perspective:\n\n{document}"
        ),
        (
            AgentConfig("Business Reviewer", "You are a business analyst. Identify business opportunities and risks."),
            f"Review this from a business perspective:\n\n{document}"
        ),
        (
            AgentConfig("User Advocate", "You are a user experience expert. Identify usability considerations."),
            f"Review this from a user perspective:\n\n{document}"
        ),
    ]
    
    results = await run_parallel(tasks, async_client)
    
    return {
        "technical": results[0],
        "business": results[1],
        "user": results[2],
    }

reviews = asyncio.run(multi_perspective_analysis(document))

Pattern 3: Orchestrator with dynamic routing

An orchestrator agent decides which subagents to invoke:

ORCHESTRATOR_PROMPT = """You are an orchestrator agent. You break complex tasks into subtasks
and delegate them to specialist agents.

Available agents:
- research_agent: searches and summarises information (use for: finding facts, gathering background)
- code_agent: writes and debugs code (use for: implementation tasks, code reviews)
- analysis_agent: analyses data and identifies patterns (use for: data interpretation, comparisons)
- writer_agent: produces polished written content (use for: final reports, summaries)

For each task, output a plan as JSON:
{
  "plan": [
    {"agent": "agent_name", "task": "specific instruction", "depends_on": [] or [step_index]},
    ...
  ]
}

Then execute the plan step by step, passing each agent's output to dependent steps."""

def run_orchestrated_task(user_request: str) -> str:
    """
    Let an orchestrator agent break down and execute a complex task.
    """
    messages = [{"role": "user", "content": user_request}]
    
    # Orchestrator creates a plan
    plan_response = client.messages.create(
        model="claude-opus-4-0",  # Use Opus for orchestration (strategic thinking)
        max_tokens=2048,
        system=ORCHESTRATOR_PROMPT,
        messages=messages,
    )
    
    plan_text = plan_response.content[0].text
    # Parse plan and execute... (implement based on your agent set)
    
    return execute_plan(plan_text, user_request)

Pattern 4: Verification agent (double-checking)

Use a second agent to validate the first agent's output:

def generate_and_verify(task: str, input_data: str) -> tuple[str, bool, str]:
    """
    Generate output, then have a verifier check it.
    Returns (output, is_verified, verification_notes).
    """
    # Step 1: Generate
    generator_config = AgentConfig(
        "Generator",
        "Complete the assigned task accurately and thoroughly."
    )
    output = run_agent(generator_config, f"{task}\n\nData:\n{input_data}")
    
    # Step 2: Verify
    verifier_config = AgentConfig(
        "Verifier",
        """You verify that outputs are accurate and complete.
        
Respond with:
VERIFIED: [YES/NO]
ISSUES: [list any errors, omissions, or concerns]
CONFIDENCE: [HIGH/MEDIUM/LOW]"""
    )
    
    verification = run_agent(
        verifier_config,
        f"""Task that was performed: {task}

Output to verify:
{output}

Original data:
{input_data}

Is this output accurate and complete?"""
    )
    
    is_verified = "VERIFIED: YES" in verification
    return output, is_verified, verification

output, verified, notes = generate_and_verify(
    "Extract all pricing tiers and their features",
    pricing_page_content
)

if not verified:
    print(f"Verification failed: {notes}")

Managing costs in multi-agent systems

Multi-agent systems multiply token costs. Cost management:

from dataclasses import field

@dataclass
class AgentConfig:
    name: str
    system_prompt: str
    model: str = "claude-haiku-4-5"  # Default to cheapest model
    max_tokens: int = 2048
    cost_limit_usd: float = 0.10  # Stop if this agent exceeds cost limit

def estimate_cost(input_tokens: int, output_tokens: int, model: str) -> float:
    """Estimate cost in USD for a model call."""
    rates = {
        "claude-haiku-4-5": (0.80, 4.00),
        "claude-sonnet-4-5": (3.00, 15.00),
        "claude-opus-4-0": (15.00, 75.00),
    }
    input_rate, output_rate = rates.get(model, (3.00, 15.00))
    return (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000

Rule of thumb: use Haiku for subagents, Sonnet for complex subagents, and Opus only for the orchestrator.


Frequently asked questions

How many agents is too many? More than 5–7 agents in a single workflow is a sign the task is poorly decomposed. Each additional agent adds latency and cost. Most production multi-agent systems have 2–4 agents.

Should agents communicate via shared state or message passing? Message passing (each agent receives the previous agent's output as input) is simpler and more reliable. Shared state (a database both agents read/write) is appropriate when agents need to run truly independently without sequential dependency.

Can agents call other agents directly? Yes, but this creates complex dependency graphs. For clarity, prefer a single orchestrator that manages all agent calls rather than agents calling each other directly.

How do I debug a multi-agent system? Log every agent's input and output with timestamps. The most common failure modes: an early agent produces bad output that propagates through the chain, or context is passed incorrectly between agents. Intermediate output logging immediately reveals where the failure happened.

What happens if an agent in the middle of the pipeline fails? Implement retry logic at each stage. For irreversible pipelines, checkpoint results after each successful stage so you can resume from the failure point rather than starting over.


Related guides


Take It Further

Claude Agent SDK Cookbook: 40 Production Patterns — Patterns 25–32 cover Multi-Agent Systems: the orchestrator template, parallel execution with asyncio, verification loops, cost budgeting, inter-agent communication, and the complete research-analyse-write pipeline with production error handling.

→ Get the Agent SDK Cookbook — $49

30-day money-back guarantee. Instant download.

AI Disclosure: Drafted with Claude Code; all patterns from Anthropic Agent SDK documentation and production usage as of April 2026.

Tools and references