← All guides

Streaming vs Batch in Claude Agent SDK: When to Use Which

How to choose between streaming and batch API calls in Claude agents — streaming for real-time UX, batch for throughput and cost, hybrid patterns for.

Streaming vs Batch in Claude Agent SDK: When to Use Which

Streaming delivers tokens as they're generated — good for chat UX and long responses. Batch processes multiple requests at once — good for throughput and 50% cost reduction on offline workloads. Most production agents need both: streaming for user-facing interactions, batch for background processing. This guide covers the implementation patterns for each and when to use which.


The Core Trade-off

Streaming Batch
First token latency Immediate Delayed (queued)
UX perception Fast Slow
Throughput 1 request at a time Many requests parallel
Cost Standard pricing 50% discount (async batch)
Best for Chat, interactive agents Bulk processing, offline tasks

Streaming: Real-Time Token Delivery

Basic streaming implementation

import anthropic

client = anthropic.Anthropic()


def stream_response(prompt: str):
    """Stream Claude's response and print tokens as they arrive."""
    with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
        print()  # New line after completion

        # Get final message with usage stats
        final_message = stream.get_final_message()
        return final_message


stream_response("Explain the concept of closures in JavaScript")

Streaming with Server-Sent Events (for web UIs)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json

app = FastAPI()
client = anthropic.Anthropic()


@app.post("/chat/stream")
async def chat_stream(request: dict):
    user_message = request.get("message", "")

    async def generate():
        with client.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            messages=[{"role": "user", "content": user_message}]
        ) as stream:
            for text in stream.text_stream:
                # SSE format: data: {...}\n\n
                yield f"data: {json.dumps({'text': text})}\n\n"

            # Signal completion
            final = stream.get_final_message()
            yield f"data: {json.dumps({'done': True, 'total_tokens': final.usage.input_tokens + final.usage.output_tokens})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

TypeScript streaming (Next.js API route)

// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

export async function POST(req: Request) {
  const { message } = await req.json();

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      const response = await client.messages.create({
        model: "claude-sonnet-4-5",
        max_tokens: 2048,
        messages: [{ role: "user", content: message }],
        stream: true,
      });

      for await (const event of response) {
        if (
          event.type === "content_block_delta" &&
          event.delta.type === "text_delta"
        ) {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify({ text: event.delta.text })}\n\n`)
          );
        }
        if (event.type === "message_stop") {
          controller.enqueue(encoder.encode("data: [DONE]\n\n"));
          controller.close();
        }
      }
    },
  });

  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream" },
  });
}

Batch Processing: High Throughput at Lower Cost

When batch is the right choice

Parallel batch with asyncio

import asyncio
import anthropic
from dataclasses import dataclass
from typing import Callable


@dataclass
class BatchJob:
    id: str
    prompt: str
    metadata: dict = None


@dataclass
class BatchResult:
    job_id: str
    output: str
    input_tokens: int
    output_tokens: int
    error: str = None


async def process_single(
    client: anthropic.AsyncAnthropic,
    job: BatchJob,
    semaphore: asyncio.Semaphore
) -> BatchResult:
    """Process a single job with rate limiting via semaphore."""
    async with semaphore:
        try:
            response = await client.messages.create(
                model="claude-haiku-4-5",  # Use Haiku for batch cost efficiency
                max_tokens=1024,
                messages=[{"role": "user", "content": job.prompt}]
            )
            return BatchResult(
                job_id=job.id,
                output=response.content[0].text,
                input_tokens=response.usage.input_tokens,
                output_tokens=response.usage.output_tokens
            )
        except anthropic.RateLimitError as e:
            # Exponential backoff on rate limit
            await asyncio.sleep(60)
            return await process_single(client, job, semaphore)
        except Exception as e:
            return BatchResult(
                job_id=job.id,
                output="",
                input_tokens=0,
                output_tokens=0,
                error=str(e)
            )


async def batch_process(
    jobs: list[BatchJob],
    concurrency: int = 5,
    on_complete: Callable[[BatchResult], None] = None
) -> list[BatchResult]:
    """
    Process jobs in parallel with controlled concurrency.
    concurrency=5 means max 5 simultaneous API calls.
    """
    client = anthropic.AsyncAnthropic()
    semaphore = asyncio.Semaphore(concurrency)

    tasks = [process_single(client, job, semaphore) for job in jobs]
    results = []

    # Process with progress reporting
    for coro in asyncio.as_completed(tasks):
        result = await coro
        results.append(result)
        if on_complete:
            on_complete(result)
        print(f"[{len(results)}/{len(jobs)}] {result.job_id}: {len(result.output)} chars")

    return results


# Usage
import asyncio

jobs = [
    BatchJob(id=f"doc_{i}", prompt=f"Summarize document {i}: [content...]")
    for i in range(50)
]

results = asyncio.run(batch_process(jobs, concurrency=5))

# Calculate total cost
total_input = sum(r.input_tokens for r in results)
total_output = sum(r.output_tokens for r in results)
cost = (total_input * 0.00000080) + (total_output * 0.00000400)  # Haiku pricing
print(f"Processed {len(results)} documents. Cost: ${cost:.4f}")

Anthropic's Native Batch API (50% cost reduction)

For large-scale batch work, use the Message Batches API — 50% cheaper with results available within 24 hours:

import anthropic
import time

client = anthropic.Anthropic()


def submit_batch(prompts: list[dict]) -> str:
    """Submit a batch request. Returns batch ID."""
    requests = [
        {
            "custom_id": f"req_{i}",
            "params": {
                "model": "claude-sonnet-4-5",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": prompt["content"]}]
            }
        }
        for i, prompt in enumerate(prompts)
    ]

    batch = client.beta.messages.batches.create(requests=requests)
    print(f"Batch submitted: {batch.id}")
    return batch.id


def wait_for_batch(batch_id: str, poll_interval: int = 60) -> list[dict]:
    """Poll until batch completes and return results."""
    while True:
        batch = client.beta.messages.batches.retrieve(batch_id)
        print(f"Batch status: {batch.processing_status} ({batch.request_counts})")

        if batch.processing_status == "ended":
            break

        time.sleep(poll_interval)

    # Collect results
    results = []
    for result in client.beta.messages.batches.results(batch_id):
        if result.result.type == "succeeded":
            results.append({
                "id": result.custom_id,
                "output": result.result.message.content[0].text,
                "usage": result.result.message.usage
            })
        else:
            results.append({
                "id": result.custom_id,
                "error": result.result.error.type
            })

    return results


# Usage: 50% cheaper than standard API
batch_id = submit_batch([
    {"content": "Summarize this article: [...]"},
    {"content": "Extract key points from: [...]"},
    # Up to 10,000 requests per batch
])

# Come back later (or poll)
results = wait_for_batch(batch_id)

Hybrid Pattern: Streaming + Batch in One Agent

Real products often need both:

import asyncio
from enum import Enum


class ProcessingMode(Enum):
    STREAMING = "streaming"  # User is waiting
    BATCH = "batch"          # Background task


class HybridAgent:
    """Agent that streams for interactive requests, batches for background work."""

    def __init__(self):
        self.client = anthropic.Anthropic()
        self.async_client = anthropic.AsyncAnthropic()
        self.background_queue = asyncio.Queue()

    def respond_streaming(self, user_message: str, messages: list):
        """For user-facing chat — stream response."""
        messages.append({"role": "user", "content": user_message})

        with self.client.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            messages=messages
        ) as stream:
            full_response = ""
            for text in stream.text_stream:
                print(text, end="", flush=True)
                full_response += text
            print()

        messages.append({"role": "assistant", "content": full_response})
        return full_response

    async def process_background(self, items: list[str]) -> list[str]:
        """For background tasks — batch process."""
        jobs = [
            BatchJob(id=f"item_{i}", prompt=item)
            for i, item in enumerate(items)
        ]
        results = await batch_process(jobs, concurrency=3)
        return [r.output for r in results if not r.error]


# Example: Chat agent that also processes documents in background
agent = HybridAgent()

# User-facing: streaming
agent.respond_streaming(
    "What are the key risks in this contract?",
    messages=[{"role": "user", "content": "[contract text]"}]
)

# Background: batch (non-blocking)
asyncio.create_task(
    agent.process_background([
        "Document 1 to analyze...",
        "Document 2 to analyze...",
        "Document 3 to analyze..."
    ])
)

Performance and Cost Comparison

Scenario Streaming Parallel Batch (concurrency=5) Native Batch API
10 documents ~30s total ~10s ~1-24hr
100 documents ~300s ~60s ~1-24hr
Cost (Sonnet) $1.00 $1.00 $0.50
Use case Interactive Background Overnight jobs

Frequently Asked Questions

When should I use streaming vs non-streaming for a chat interface? Always use streaming for chat interfaces. Users perceive streaming responses as much faster even when total generation time is the same. The perceived latency difference is significant — 500ms to first token vs 3-5s to complete response.

Does streaming cost more than non-streaming? No — streaming and non-streaming use the same token count and pricing. Streaming is just a delivery mechanism, not a different pricing tier.

What concurrency level should I use for batch processing? Start at 5. Anthropic's rate limits are tier-based. If you hit 429 errors, reduce to 3. If processing completes without errors, you can try increasing to 8-10. The right number depends on your API tier and model.

Can I use streaming in the Anthropic Message Batches API? No — the Message Batches API is inherently asynchronous and returns complete responses. Streaming is only available for real-time synchronous API calls.

How do I handle partial results in batch processing? Save results as they complete, not just at the end. In the on_complete callback, write each result to your database or file immediately. This way, if processing is interrupted at job 85/100, you haven't lost the first 85 results.


Related Guides


Go Deeper

Agent SDK Cookbook — $49 — Full batch processing implementation: priority queue with concurrency control, progress dashboard, checkpoint/resume for interrupted batches, and the native Batch API integration with result parsing.

→ Get the Agent SDK Cookbook — $49

30-day money-back guarantee. Instant download.

AI Disclosure: Written with Claude Code; patterns tested in production agent workloads.

Tools and references