Streaming vs Batch in Claude Agent SDK: When to Use Which
Streaming delivers tokens as they're generated — good for chat UX and long responses. Batch processes multiple requests at once — good for throughput and 50% cost reduction on offline workloads. Most production agents need both: streaming for user-facing interactions, batch for background processing. This guide covers the implementation patterns for each and when to use which.
The Core Trade-off
| Streaming | Batch | |
|---|---|---|
| First token latency | Immediate | Delayed (queued) |
| UX perception | Fast | Slow |
| Throughput | 1 request at a time | Many requests parallel |
| Cost | Standard pricing | 50% discount (async batch) |
| Best for | Chat, interactive agents | Bulk processing, offline tasks |
Streaming: Real-Time Token Delivery
Basic streaming implementation
import anthropic
client = anthropic.Anthropic()
def stream_response(prompt: str):
"""Stream Claude's response and print tokens as they arrive."""
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # New line after completion
# Get final message with usage stats
final_message = stream.get_final_message()
return final_message
stream_response("Explain the concept of closures in JavaScript")
Streaming with Server-Sent Events (for web UIs)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic()
@app.post("/chat/stream")
async def chat_stream(request: dict):
user_message = request.get("message", "")
async def generate():
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=[{"role": "user", "content": user_message}]
) as stream:
for text in stream.text_stream:
# SSE format: data: {...}\n\n
yield f"data: {json.dumps({'text': text})}\n\n"
# Signal completion
final = stream.get_final_message()
yield f"data: {json.dumps({'done': True, 'total_tokens': final.usage.input_tokens + final.usage.output_tokens})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
TypeScript streaming (Next.js API route)
// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
export async function POST(req: Request) {
const { message } = await req.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const response = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 2048,
messages: [{ role: "user", content: message }],
stream: true,
});
for await (const event of response) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ text: event.delta.text })}\n\n`)
);
}
if (event.type === "message_stop") {
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
}
}
},
});
return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
});
}
Batch Processing: High Throughput at Lower Cost
When batch is the right choice
- Processing 10+ documents
- Background summarization jobs
- Nightly content generation
- Bulk analysis pipelines
- Any workload where results aren't needed immediately
Parallel batch with asyncio
import asyncio
import anthropic
from dataclasses import dataclass
from typing import Callable
@dataclass
class BatchJob:
id: str
prompt: str
metadata: dict = None
@dataclass
class BatchResult:
job_id: str
output: str
input_tokens: int
output_tokens: int
error: str = None
async def process_single(
client: anthropic.AsyncAnthropic,
job: BatchJob,
semaphore: asyncio.Semaphore
) -> BatchResult:
"""Process a single job with rate limiting via semaphore."""
async with semaphore:
try:
response = await client.messages.create(
model="claude-haiku-4-5", # Use Haiku for batch cost efficiency
max_tokens=1024,
messages=[{"role": "user", "content": job.prompt}]
)
return BatchResult(
job_id=job.id,
output=response.content[0].text,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens
)
except anthropic.RateLimitError as e:
# Exponential backoff on rate limit
await asyncio.sleep(60)
return await process_single(client, job, semaphore)
except Exception as e:
return BatchResult(
job_id=job.id,
output="",
input_tokens=0,
output_tokens=0,
error=str(e)
)
async def batch_process(
jobs: list[BatchJob],
concurrency: int = 5,
on_complete: Callable[[BatchResult], None] = None
) -> list[BatchResult]:
"""
Process jobs in parallel with controlled concurrency.
concurrency=5 means max 5 simultaneous API calls.
"""
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(concurrency)
tasks = [process_single(client, job, semaphore) for job in jobs]
results = []
# Process with progress reporting
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
if on_complete:
on_complete(result)
print(f"[{len(results)}/{len(jobs)}] {result.job_id}: {len(result.output)} chars")
return results
# Usage
import asyncio
jobs = [
BatchJob(id=f"doc_{i}", prompt=f"Summarize document {i}: [content...]")
for i in range(50)
]
results = asyncio.run(batch_process(jobs, concurrency=5))
# Calculate total cost
total_input = sum(r.input_tokens for r in results)
total_output = sum(r.output_tokens for r in results)
cost = (total_input * 0.00000080) + (total_output * 0.00000400) # Haiku pricing
print(f"Processed {len(results)} documents. Cost: ${cost:.4f}")
Anthropic's Native Batch API (50% cost reduction)
For large-scale batch work, use the Message Batches API — 50% cheaper with results available within 24 hours:
import anthropic
import time
client = anthropic.Anthropic()
def submit_batch(prompts: list[dict]) -> str:
"""Submit a batch request. Returns batch ID."""
requests = [
{
"custom_id": f"req_{i}",
"params": {
"model": "claude-sonnet-4-5",
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt["content"]}]
}
}
for i, prompt in enumerate(prompts)
]
batch = client.beta.messages.batches.create(requests=requests)
print(f"Batch submitted: {batch.id}")
return batch.id
def wait_for_batch(batch_id: str, poll_interval: int = 60) -> list[dict]:
"""Poll until batch completes and return results."""
while True:
batch = client.beta.messages.batches.retrieve(batch_id)
print(f"Batch status: {batch.processing_status} ({batch.request_counts})")
if batch.processing_status == "ended":
break
time.sleep(poll_interval)
# Collect results
results = []
for result in client.beta.messages.batches.results(batch_id):
if result.result.type == "succeeded":
results.append({
"id": result.custom_id,
"output": result.result.message.content[0].text,
"usage": result.result.message.usage
})
else:
results.append({
"id": result.custom_id,
"error": result.result.error.type
})
return results
# Usage: 50% cheaper than standard API
batch_id = submit_batch([
{"content": "Summarize this article: [...]"},
{"content": "Extract key points from: [...]"},
# Up to 10,000 requests per batch
])
# Come back later (or poll)
results = wait_for_batch(batch_id)
Hybrid Pattern: Streaming + Batch in One Agent
Real products often need both:
import asyncio
from enum import Enum
class ProcessingMode(Enum):
STREAMING = "streaming" # User is waiting
BATCH = "batch" # Background task
class HybridAgent:
"""Agent that streams for interactive requests, batches for background work."""
def __init__(self):
self.client = anthropic.Anthropic()
self.async_client = anthropic.AsyncAnthropic()
self.background_queue = asyncio.Queue()
def respond_streaming(self, user_message: str, messages: list):
"""For user-facing chat — stream response."""
messages.append({"role": "user", "content": user_message})
with self.client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=messages
) as stream:
full_response = ""
for text in stream.text_stream:
print(text, end="", flush=True)
full_response += text
print()
messages.append({"role": "assistant", "content": full_response})
return full_response
async def process_background(self, items: list[str]) -> list[str]:
"""For background tasks — batch process."""
jobs = [
BatchJob(id=f"item_{i}", prompt=item)
for i, item in enumerate(items)
]
results = await batch_process(jobs, concurrency=3)
return [r.output for r in results if not r.error]
# Example: Chat agent that also processes documents in background
agent = HybridAgent()
# User-facing: streaming
agent.respond_streaming(
"What are the key risks in this contract?",
messages=[{"role": "user", "content": "[contract text]"}]
)
# Background: batch (non-blocking)
asyncio.create_task(
agent.process_background([
"Document 1 to analyze...",
"Document 2 to analyze...",
"Document 3 to analyze..."
])
)
Performance and Cost Comparison
| Scenario | Streaming | Parallel Batch (concurrency=5) | Native Batch API |
|---|---|---|---|
| 10 documents | ~30s total | ~10s | ~1-24hr |
| 100 documents | ~300s | ~60s | ~1-24hr |
| Cost (Sonnet) | $1.00 | $1.00 | $0.50 |
| Use case | Interactive | Background | Overnight jobs |
Frequently Asked Questions
When should I use streaming vs non-streaming for a chat interface? Always use streaming for chat interfaces. Users perceive streaming responses as much faster even when total generation time is the same. The perceived latency difference is significant — 500ms to first token vs 3-5s to complete response.
Does streaming cost more than non-streaming? No — streaming and non-streaming use the same token count and pricing. Streaming is just a delivery mechanism, not a different pricing tier.
What concurrency level should I use for batch processing? Start at 5. Anthropic's rate limits are tier-based. If you hit 429 errors, reduce to 3. If processing completes without errors, you can try increasing to 8-10. The right number depends on your API tier and model.
Can I use streaming in the Anthropic Message Batches API? No — the Message Batches API is inherently asynchronous and returns complete responses. Streaming is only available for real-time synchronous API calls.
How do I handle partial results in batch processing?
Save results as they complete, not just at the end. In the on_complete callback, write each result to your database or file immediately. This way, if processing is interrupted at job 85/100, you haven't lost the first 85 results.
Related Guides
- How to Handle Errors and Retries in Claude Agent SDK — Rate limit handling
- Claude Agent Observability — Token usage tracking
- Claude API Cost Optimization — Cost reduction strategies
Go Deeper
Agent SDK Cookbook — $49 — Full batch processing implementation: priority queue with concurrency control, progress dashboard, checkpoint/resume for interrupted batches, and the native Batch API integration with result parsing.
→ Get the Agent SDK Cookbook — $49
30-day money-back guarantee. Instant download.