← All guides

Claude API Streaming: Complete Implementation Guide

How to implement streaming responses with the Claude API — Python async, TypeScript, Server-Sent Events, and handling streaming in production applications.

🇰🇷 한국어로 보기 →

Claude API Streaming: Complete Implementation Guide

Claude API streaming delivers response tokens as they're generated, rather than waiting for the complete response. Streaming reduces perceived latency from 5–30 seconds (waiting for full response) to near-instant first token appearance. For any user-facing application, streaming is the correct default. For batch processing pipelines, non-streaming is simpler and equally fast. This guide covers both patterns with complete Python and TypeScript implementations.


Why streaming matters for UX

Non-streaming: user clicks Send → waits 8 seconds → full response appears. Streaming: user clicks Send → first words appear in ~500ms → response builds in real time.

The total time-to-complete is identical. But perceived latency is dramatically lower with streaming. Studies on AI interfaces consistently show that streaming responses have higher user satisfaction and lower abandonment rates.


Python streaming implementation

Basic streaming

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain quantum entanglement simply."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# After the loop, get the final complete message
final_message = stream.get_final_message()
print(f"\nTotal tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}")

Async streaming (for FastAPI, async frameworks)

import asyncio
import anthropic

client = anthropic.AsyncAnthropic()

async def stream_response(user_message: str) -> str:
    """Stream a response and return the full text."""
    full_text = ""
    
    async with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": user_message}],
    ) as stream:
        async for text in stream.text_stream:
            full_text += text
            print(text, end="", flush=True)
    
    return full_text

# Run
asyncio.run(stream_response("Write a haiku about streaming data."))

FastAPI streaming endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.AsyncAnthropic()

@app.post("/chat/stream")
async def stream_chat(body: dict):
    async def generate():
        async with client.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            messages=body.get("messages", []),
            system=body.get("system", "You are a helpful assistant."),
        ) as stream:
            async for text in stream.text_stream:
                yield text
    
    return StreamingResponse(
        generate(),
        media_type="text/plain",
        headers={
            "Cache-Control": "no-cache",
            "Transfer-Encoding": "chunked",
        },
    )

TypeScript streaming implementation

Node.js / server-side streaming

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

async function streamResponse(userMessage: string): Promise<void> {
  const stream = client.messages.stream({
    model: "claude-sonnet-4-5",
    max_tokens: 1024,
    messages: [{ role: "user", content: userMessage }],
  });

  // Print each text chunk as it arrives
  stream.on("text", (text) => {
    process.stdout.write(text);
  });

  // Wait for completion
  const finalMessage = await stream.finalMessage();
  console.log("\n\nUsage:", finalMessage.usage);
}

streamResponse("List 5 principles of good API design.");

Collect full text without printing

async function getStreamedText(userMessage: string): Promise<string> {
  let fullText = "";

  const stream = client.messages.stream({
    model: "claude-sonnet-4-5",
    max_tokens: 1024,
    messages: [{ role: "user", content: userMessage }],
  });

  stream.on("text", (chunk) => {
    fullText += chunk;
  });

  await stream.done();
  return fullText;
}

Server-Sent Events (SSE) for browser clients

SSE is the standard protocol for streaming AI responses to browser clients. Unlike WebSockets, SSE is unidirectional (server to client) and works over standard HTTP.

Next.js API route with SSE

// app/api/stream/route.ts
import Anthropic from "@anthropic-ai/sdk";
import { NextRequest } from "next/server";

const client = new Anthropic();

export async function POST(request: NextRequest) {
  const { messages } = await request.json();

  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  // Run streaming in background
  (async () => {
    try {
      const anthropicStream = client.messages.stream({
        model: "claude-sonnet-4-5",
        max_tokens: 2048,
        messages,
      });

      anthropicStream.on("text", async (text) => {
        // SSE format: "data: <content>\n\n"
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({ text })}\n\n`)
        );
      });

      await anthropicStream.done();
      // Signal completion
      await writer.write(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`));
    } catch (error) {
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({ error: String(error) })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

Browser client consuming SSE

// React component consuming SSE
async function sendMessageSSE(message: string, onChunk: (text: string) => void) {
  const response = await fetch("/api/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages: [{ role: "user", content: message }] }),
  });

  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  while (reader) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    
    // Parse SSE format
    const lines = chunk.split("\n\n").filter(Boolean);
    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = JSON.parse(line.slice(6));
        if (data.text) onChunk(data.text);
        if (data.done) return;
        if (data.error) throw new Error(data.error);
      }
    }
  }
}

Raw streaming events (advanced)

For fine-grained control, iterate over raw events instead of the text stream:

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}],
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                # Message metadata (model, input tokens estimate)
                print(f"Started. Input tokens: {event.message.usage.input_tokens}")
            
            case "content_block_start":
                # New content block starting
                pass
            
            case "content_block_delta":
                # Text chunk
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
            
            case "content_block_stop":
                pass
            
            case "message_delta":
                # Stop reason and output token count
                if event.usage:
                    print(f"\nOutput tokens: {event.usage.output_tokens}")
            
            case "message_stop":
                print("\nStream complete")

Production considerations

Timeout configuration: set a generous read timeout. Large responses take 30–120 seconds to complete. The default httpx timeout is too short for many agentic responses:

client = anthropic.Anthropic(
    timeout=httpx.Timeout(
        connect=5.0,
        read=120.0,   # 2 minutes for long responses
        write=10.0,
        pool=5.0,
    )
)

Connection drops: browser connections can close mid-stream (user navigates away, network error). The server-side stream continues unless you actively monitor the connection:

# FastAPI: check if client disconnected
@app.post("/stream")
async def stream_endpoint(request: Request, body: dict):
    async def generate():
        async with client.messages.stream(...) as stream:
            async for text in stream.text_stream:
                if await request.is_disconnected():
                    break
                yield text
    return StreamingResponse(generate(), media_type="text/plain")

Cost: streaming and non-streaming have identical costs per token. Streaming uses slightly more server resources (persistent connection) but the token cost is the same.


Frequently asked questions

Is streaming available for all Claude models? Yes. All Claude models (Haiku, Sonnet, Opus) support streaming. The implementation is identical across models — just change the model parameter.

Can I use streaming with tool use? Yes, but tool inputs stream as partial JSON, not text. The pattern: stream the response, detect tool_use blocks as they complete, execute tools, return results. See the tool use guide for the complete implementation.

Does streaming work with prompt caching? Yes. Add cache_control to your messages as normal. The SDK's streaming interface is compatible with caching. Cache read/write counts appear in the message_start event's usage object.

Why do I see blank space before streaming starts? First-token latency (time to first token, TTFT) is typically 300–800ms even for streaming. This is server-side processing time, not a client buffering issue. For models with very long contexts, TTFT can be 1–3 seconds.

What's the difference between .text_stream and raw event iteration? .text_stream yields only the text content (strings). Raw event iteration gives access to all event types including message metadata, token counts, and stop reasons. For UI display, use .text_stream. For monitoring and cost tracking, use raw events.


Related guides


Take It Further

Claude Agent SDK Cookbook: 40 Production Patterns — Pattern 5 covers Streaming Architecture: SSE implementation with React, streaming with tool use, connection drop recovery, and the monitoring pattern that tracks TTFT and streaming throughput per request.

→ Get the Agent SDK Cookbook — $49

30-day money-back guarantee. Instant download.

AI Disclosure: Drafted with Claude Code; all streaming patterns from Anthropic SDK as of April 2026.

Tools and references