Claude API Streaming: Complete Implementation Guide
Claude API streaming delivers response tokens as they're generated, rather than waiting for the complete response. Streaming reduces perceived latency from 5–30 seconds (waiting for full response) to near-instant first token appearance. For any user-facing application, streaming is the correct default. For batch processing pipelines, non-streaming is simpler and equally fast. This guide covers both patterns with complete Python and TypeScript implementations.
Why streaming matters for UX
Non-streaming: user clicks Send → waits 8 seconds → full response appears. Streaming: user clicks Send → first words appear in ~500ms → response builds in real time.
The total time-to-complete is identical. But perceived latency is dramatically lower with streaming. Studies on AI interfaces consistently show that streaming responses have higher user satisfaction and lower abandonment rates.
Python streaming implementation
Basic streaming
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum entanglement simply."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# After the loop, get the final complete message
final_message = stream.get_final_message()
print(f"\nTotal tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}")
Async streaming (for FastAPI, async frameworks)
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_response(user_message: str) -> str:
"""Stream a response and return the full text."""
full_text = ""
async with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for text in stream.text_stream:
full_text += text
print(text, end="", flush=True)
return full_text
# Run
asyncio.run(stream_response("Write a haiku about streaming data."))
FastAPI streaming endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.post("/chat/stream")
async def stream_chat(body: dict):
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=body.get("messages", []),
system=body.get("system", "You are a helpful assistant."),
) as stream:
async for text in stream.text_stream:
yield text
return StreamingResponse(
generate(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Transfer-Encoding": "chunked",
},
)
TypeScript streaming implementation
Node.js / server-side streaming
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
async function streamResponse(userMessage: string): Promise<void> {
const stream = client.messages.stream({
model: "claude-sonnet-4-5",
max_tokens: 1024,
messages: [{ role: "user", content: userMessage }],
});
// Print each text chunk as it arrives
stream.on("text", (text) => {
process.stdout.write(text);
});
// Wait for completion
const finalMessage = await stream.finalMessage();
console.log("\n\nUsage:", finalMessage.usage);
}
streamResponse("List 5 principles of good API design.");
Collect full text without printing
async function getStreamedText(userMessage: string): Promise<string> {
let fullText = "";
const stream = client.messages.stream({
model: "claude-sonnet-4-5",
max_tokens: 1024,
messages: [{ role: "user", content: userMessage }],
});
stream.on("text", (chunk) => {
fullText += chunk;
});
await stream.done();
return fullText;
}
Server-Sent Events (SSE) for browser clients
SSE is the standard protocol for streaming AI responses to browser clients. Unlike WebSockets, SSE is unidirectional (server to client) and works over standard HTTP.
Next.js API route with SSE
// app/api/stream/route.ts
import Anthropic from "@anthropic-ai/sdk";
import { NextRequest } from "next/server";
const client = new Anthropic();
export async function POST(request: NextRequest) {
const { messages } = await request.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
// Run streaming in background
(async () => {
try {
const anthropicStream = client.messages.stream({
model: "claude-sonnet-4-5",
max_tokens: 2048,
messages,
});
anthropicStream.on("text", async (text) => {
// SSE format: "data: <content>\n\n"
await writer.write(
encoder.encode(`data: ${JSON.stringify({ text })}\n\n`)
);
});
await anthropicStream.done();
// Signal completion
await writer.write(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`));
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({ error: String(error) })}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
Browser client consuming SSE
// React component consuming SSE
async function sendMessageSSE(message: string, onChunk: (text: string) => void) {
const response = await fetch("/api/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages: [{ role: "user", content: message }] }),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
// Parse SSE format
const lines = chunk.split("\n\n").filter(Boolean);
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
if (data.text) onChunk(data.text);
if (data.done) return;
if (data.error) throw new Error(data.error);
}
}
}
}
Raw streaming events (advanced)
For fine-grained control, iterate over raw events instead of the text stream:
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
) as stream:
for event in stream:
match event.type:
case "message_start":
# Message metadata (model, input tokens estimate)
print(f"Started. Input tokens: {event.message.usage.input_tokens}")
case "content_block_start":
# New content block starting
pass
case "content_block_delta":
# Text chunk
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
case "content_block_stop":
pass
case "message_delta":
# Stop reason and output token count
if event.usage:
print(f"\nOutput tokens: {event.usage.output_tokens}")
case "message_stop":
print("\nStream complete")
Production considerations
Timeout configuration: set a generous read timeout. Large responses take 30–120 seconds to complete. The default httpx timeout is too short for many agentic responses:
client = anthropic.Anthropic(
timeout=httpx.Timeout(
connect=5.0,
read=120.0, # 2 minutes for long responses
write=10.0,
pool=5.0,
)
)
Connection drops: browser connections can close mid-stream (user navigates away, network error). The server-side stream continues unless you actively monitor the connection:
# FastAPI: check if client disconnected
@app.post("/stream")
async def stream_endpoint(request: Request, body: dict):
async def generate():
async with client.messages.stream(...) as stream:
async for text in stream.text_stream:
if await request.is_disconnected():
break
yield text
return StreamingResponse(generate(), media_type="text/plain")
Cost: streaming and non-streaming have identical costs per token. Streaming uses slightly more server resources (persistent connection) but the token cost is the same.
Frequently asked questions
Is streaming available for all Claude models?
Yes. All Claude models (Haiku, Sonnet, Opus) support streaming. The implementation is identical across models — just change the model parameter.
Can I use streaming with tool use?
Yes, but tool inputs stream as partial JSON, not text. The pattern: stream the response, detect tool_use blocks as they complete, execute tools, return results. See the tool use guide for the complete implementation.
Does streaming work with prompt caching?
Yes. Add cache_control to your messages as normal. The SDK's streaming interface is compatible with caching. Cache read/write counts appear in the message_start event's usage object.
Why do I see blank space before streaming starts? First-token latency (time to first token, TTFT) is typically 300–800ms even for streaming. This is server-side processing time, not a client buffering issue. For models with very long contexts, TTFT can be 1–3 seconds.
What's the difference between .text_stream and raw event iteration?
.text_stream yields only the text content (strings). Raw event iteration gives access to all event types including message metadata, token counts, and stop reasons. For UI display, use .text_stream. For monitoring and cost tracking, use raw events.
Related guides
- Build an AI Chatbot with Next.js and Claude — complete chatbot implementation using streaming
- Claude API Error Handling: Production Patterns — handling streaming errors
Take It Further
Claude Agent SDK Cookbook: 40 Production Patterns — Pattern 5 covers Streaming Architecture: SSE implementation with React, streaming with tool use, connection drop recovery, and the monitoring pattern that tracks TTFT and streaming throughput per request.
→ Get the Agent SDK Cookbook — $49
30-day money-back guarantee. Instant download.