← All guides

Claude API 스트리밍 응답 구현 완전 가이드 (Python · TypeScript)

Claude API 스트리밍 응답 구현 방법 — Python stream(), TypeScript AsyncIterable, SSE, FastAPI, Next.js App Router까지 실전 코드 예제로 설명합니다.

🇺🇸 Read in English →

Claude API 스트리밍 응답 구현 완전 가이드 (Python · TypeScript)

Claude API 스트리밍은 stream=True 또는 SDK의 .stream() 메서드 하나로 활성화됩니다 — 응답이 생성되는 즉시 토큰 단위로 전달받아 ChatGPT처럼 글자가 한 글자씩 출력되는 UX를 구현할 수 있습니다. 전체 응답을 기다리지 않으므로 사용자가 느끼는 체감 지연(perceived latency)이 크게 줄어듭니다. 이 가이드는 Python과 TypeScript의 스트리밍 패턴, FastAPI + SSE, Next.js App Router 연동, 그리고 에러 처리까지 실전 코드로 다룹니다.


스트리밍 vs 논-스트리밍 비교

항목 논-스트리밍 스트리밍
첫 토큰 도달 전체 생성 완료 후 수백 ms 이내
사용자 경험 빈 화면 → 한꺼번에 출력 실시간 타이핑 효과
코드 복잡도 낮음 약간 높음
비용 동일 동일
권장 상황 배치, 백그라운드 작업 대화형 UI, 긴 응답

스트리밍은 비용이 추가되지 않습니다 — 토큰 단위 과금 방식은 동일하며, 전달 방식만 다릅니다.


Python 스트리밍 기본

SDK stream() 컨텍스트 매니저 (권장)

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "파이썬으로 퀵소트를 구현해줘"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# 스트림 완료 후 최종 메시지 객체 접근
final_message = stream.get_final_message()
print(f"\n\n총 토큰: {final_message.usage.input_tokens + final_message.usage.output_tokens}")

.text_stream은 텍스트 델타(delta)만 yield하는 제너레이터입니다. 전체 이벤트가 필요하면 stream 이터러블을 직접 순회하세요:

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "안녕하세요"}],
) as stream:
    for event in stream:
        event_type = type(event).__name__
        if event_type == "RawContentBlockDeltaEvent":
            if hasattr(event.delta, "text"):
                print(event.delta.text, end="", flush=True)

비동기(Async) 스트리밍

FastAPI나 asyncio 환경에서는 AsyncAnthropic을 사용합니다:

import asyncio
import anthropic

async def stream_response(prompt: str) -> str:
    client = anthropic.AsyncAnthropic()
    full_text = ""

    async with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
            full_text += text

    return full_text

asyncio.run(stream_response("Claude API 스트리밍의 장점을 설명해줘"))

TypeScript 스트리밍

기본 스트리밍

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = await client.messages.stream({
  model: "claude-sonnet-4-5",
  max_tokens: 1024,
  messages: [{ role: "user", content: "TypeScript로 배열을 정렬하는 방법" }],
});

// 텍스트 스트림
for await (const text of stream.textStream) {
  process.stdout.write(text);
}

// 최종 메시지
const finalMessage = await stream.finalMessage();
console.log("\nStop reason:", finalMessage.stop_reason);

이벤트 핸들러 방식

const stream = client.messages
  .stream({
    model: "claude-sonnet-4-5",
    max_tokens: 1024,
    messages: [{ role: "user", content: "Next.js 서버 컴포넌트를 설명해줘" }],
  })
  .on("text", (text) => {
    process.stdout.write(text);
  })
  .on("message", (message) => {
    console.log("\nFinal:", message.usage);
  });

await stream.finalMessage();

FastAPI + SSE (Server-Sent Events)

프론트엔드에 스트리밍 응답을 전달하는 가장 일반적인 패턴은 SSE(Server-Sent Events)입니다:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
claude = anthropic.AsyncAnthropic()

@app.post("/api/chat/stream")
async def chat_stream(request: dict):
    prompt = request.get("prompt", "")

    async def generate():
        async with claude.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            async for text in stream.text_stream:
                # SSE 형식: "data: {text}\n\n"
                yield f"data: {text}\n\n"
            yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Nginx 버퍼링 비활성화
        },
    )

중간 안내

Claude API 스트리밍을 포함한 15가지 프로덕션 에이전트 구현이 담긴 **Agent SDK Cookbook ($49)**을 확인하세요. 에러 처리, 재시도 로직, 비용 추적까지 포함된 완성된 코드를 제공합니다.

→ Agent SDK Cookbook 구매하기 — $49


Next.js App Router 스트리밍

Next.js 15 App Router의 Route Handler에서 스트리밍을 구현하는 패턴입니다:

// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

export async function POST(request: Request) {
  const { prompt } = await request.json();

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      const anthropicStream = await client.messages.stream({
        model: "claude-sonnet-4-5",
        max_tokens: 2048,
        messages: [{ role: "user", content: prompt }],
      });

      for await (const text of anthropicStream.textStream) {
        controller.enqueue(encoder.encode(`data: ${JSON.stringify({ text })}\n\n`));
      }

      controller.enqueue(encoder.encode("data: [DONE]\n\n"));
      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

프론트엔드 React 컴포넌트:

"use client";

import { useState } from "react";

export default function ChatComponent() {
  const [response, setResponse] = useState("");
  const [loading, setLoading] = useState(false);

  async function handleSubmit(prompt: string) {
    setLoading(true);
    setResponse("");

    const res = await fetch("/api/chat", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ prompt }),
    });

    const reader = res.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const lines = chunk.split("\n\n").filter(Boolean);

      for (const line of lines) {
        if (line.startsWith("data: ") && line !== "data: [DONE]") {
          const data = JSON.parse(line.slice(6));
          setResponse(prev => prev + data.text);
        }
      }
    }

    setLoading(false);
  }

  return (
    <div>
      <button onClick={() => handleSubmit("Claude 스트리밍을 설명해줘")} disabled={loading}>
        {loading ? "생성 중..." : "질문하기"}
      </button>
      <pre style={{ whiteSpace: "pre-wrap" }}>{response}</pre>
    </div>
  );
}

시스템 프롬프트 + 스트리밍

시스템 프롬프트가 큰 경우 스트리밍과 프롬프트 캐싱을 함께 사용하면 비용을 최대 90%까지 절감할 수 있습니다. 자세한 계산은 Claude API 비용과 프롬프트 캐싱 손익분기 분석을 참고하세요.

async with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=2048,
    system=[{
        "type": "text",
        "text": "당신은 한국어 기술 문서 전문가입니다. [긴 컨텍스트...]",
        "cache_control": {"type": "ephemeral"}  # 캐시 활성화
    }],
    messages=[{"role": "user", "content": user_prompt}],
) as stream:
    async for text in stream.text_stream:
        yield text

첫 번째 호출에서 캐시를 저장하고, 두 번째 호출부터 입력 토큰 비용의 10%만 과금됩니다. 손익분기는 1.28회 재사용이며, 1일 100회 이상 호출하는 서비스라면 즉시 도입 효과가 있습니다.


에러 처리

스트리밍 중 에러가 발생하면 스트림이 중단됩니다. 재시도 로직을 감싸는 것이 중요합니다:

import anthropic
from anthropic import APIStatusError, APITimeoutError, APIConnectionError
import asyncio

async def stream_with_retry(prompt: str, max_retries: int = 3):
    client = anthropic.AsyncAnthropic()

    for attempt in range(max_retries):
        try:
            async with client.messages.stream(
                model="claude-sonnet-4-5",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}],
            ) as stream:
                async for text in stream.text_stream:
                    yield text
                return  # 성공 시 종료

        except APIStatusError as e:
            if e.status_code == 429:  # Rate limit
                wait = 2 ** attempt
                print(f"Rate limited. {wait}초 후 재시도...")
                await asyncio.sleep(wait)
            elif e.status_code >= 500:  # 서버 오류
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)
            else:
                raise  # 4xx 클라이언트 오류는 재시도하지 않음

        except (APITimeoutError, APIConnectionError):
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(1)

에러 코드 전체 목록은 Claude API 에러 코드 레퍼런스를 참고하세요.


스트리밍 사용 통계 (벤치마크)

실제 측정 데이터 (claude-sonnet-4-5 기준, 2025년 12월):

대화형 UI에서 TTFT 280ms는 사용자가 응답이 즉시 시작되는 것으로 인식하는 임계값(400ms) 이하입니다.


관련 가이드


자주 묻는 질문 (FAQ)

스트리밍과 논-스트리밍의 비용 차이가 있나요?

없습니다 — 토큰 단위 과금은 동일합니다. 스트리밍은 전달 방식만 다를 뿐, 생성되는 토큰 수는 같습니다.

스트리밍 중에 중단(abort)이 가능한가요?

Python에서는 컨텍스트 매니저를 일찍 종료(break 또는 예외)하면 스트림이 취소됩니다. TypeScript에서는 stream.controller.abort()를 호출하거나 AbortController를 사용합니다. HTTP 연결이 닫히면 API 서버 측에서도 생성이 중단됩니다.

스트리밍 응답에서 총 토큰 수를 알 수 있나요?

스트림 완료 후 stream.get_final_message() (Python) 또는 await stream.finalMessage() (TypeScript)로 최종 메시지 객체를 가져오면 usage.input_tokensusage.output_tokens를 확인할 수 있습니다.

Vercel Edge Functions에서 스트리밍이 작동하나요?

네 — Vercel Edge Runtime은 ReadableStream을 네이티브 지원합니다. 단, Edge Runtime은 Node.js API를 완전히 지원하지 않으므로 @anthropic-ai/sdk의 일부 기능(파일 업로드 등)은 Node.js Runtime으로 전환해야 합니다. 기본 스트리밍은 Edge에서 완벽히 작동합니다.

스트리밍 중 네트워크가 끊기면 어떻게 되나요?

SSE 연결이 끊기면 브라우저는 기본적으로 3초 후 재연결을 시도합니다. 이미 받은 텍스트를 저장하고 Last-Event-ID 헤더로 재개 지점을 서버에 알리는 패턴을 사용하면 중단된 지점부터 이어서 받을 수 있습니다.

툴 사용(tool_use)과 스트리밍을 함께 쓸 수 있나요?

네 — 스트리밍 중에도 stop_reasontool_use가 되면 툴 블록이 전달됩니다. SDK의 스트리밍 헬퍼는 이를 자동으로 처리하며, 툴 입력 JSON도 스트리밍 방식으로 점진적으로 전달됩니다.


더 깊이 배우기

Agent SDK Cookbook — $49 — 스트리밍, 툴 사용, 프롬프트 캐싱, 비용 추적이 모두 포함된 15개의 프로덕션 에이전트 구현. Python과 TypeScript 모두 제공.

→ Agent SDK Cookbook 구매하기 — $49

30일 환불 보장. 즉시 다운로드.

도구와 자료