Claude API + FastAPI 연동 가이드: 스트리밍 엔드포인트 완성
FastAPI와 Claude API를 연동하면 실시간 스트리밍 챗봇, AI 요약 서비스, 문서 분석 API를 단 수십 줄의 Python 코드로 구축할 수 있습니다. pip install fastapi anthropic uvicorn으로 설치하고, StreamingResponse와 client.messages.stream()을 결합하면 토큰이 생성될 때마다 즉시 클라이언트로 전달하는 SSE 엔드포인트가 완성됩니다. Rate Limit 핸들링과 비용 로깅까지 갖추면 프로덕션 수준의 AI 백엔드가 됩니다. 이 가이드는 기본 설정부터 배포 체크리스트까지 실무 코드를 중심으로 단계별로 설명합니다.
기본 설정
패키지 설치
pip install fastapi anthropic uvicorn python-dotenv
FastAPI는 ASGI 웹 프레임워크, anthropic은 Claude API Python SDK, uvicorn은 ASGI 서버입니다.
환경변수 설정
프로젝트 루트에 .env 파일을 생성합니다:
ANTHROPIC_API_KEY=sk-ant-api03-여기에키입력
.gitignore에 반드시 추가하세요:
echo ".env" >> .gitignore
API 키는 console.anthropic.com → Settings → API Keys에서 발급받습니다. 키는 생성 시 한 번만 표시되므로 즉시 복사해 보관하세요.
기본 임포트 구조
# main.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI(title="Claude API 서비스")
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
anthropic.Anthropic() 인스턴스를 모듈 최상단에서 한 번만 생성해 재사용합니다. 요청마다 새 인스턴스를 만들면 커넥션 오버헤드가 증가합니다.
첫 번째 엔드포인트
스트리밍 없이 Claude 응답을 반환하는 단순 POST 엔드포인트입니다. 기능 검증과 프로토타입 단계에 적합합니다.
class ChatRequest(BaseModel):
prompt: str
model: str = "claude-sonnet-4-5"
max_tokens: int = 1024
class ChatResponse(BaseModel):
content: str
input_tokens: int
output_tokens: int
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
message = client.messages.create(
model=request.model,
max_tokens=request.max_tokens,
messages=[{"role": "user", "content": request.prompt}]
)
return ChatResponse(
content=message.content[0].text,
input_tokens=message.usage.input_tokens,
output_tokens=message.usage.output_tokens,
)
서버 실행:
uvicorn main:app --reload --port 8000
테스트:
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{"prompt": "FastAPI란 무엇인가요?"}'
message.usage에서 토큰 수를 함께 반환하면 비용 추적이 쉬워집니다. 이 패턴은 Claude API 한국어 입문 가이드의 기본 구조와 동일합니다.
스트리밍 응답 구현
스트리밍은 Claude가 토큰을 생성하는 즉시 클라이언트로 전달합니다. 긴 응답에서 첫 토큰까지의 지연(TTFT)을 크게 줄여 사용자 경험을 향상시킵니다.
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
def generate():
with client.messages.stream(
model=request.model,
max_tokens=request.max_tokens,
messages=[{"role": "user", "content": request.prompt}]
) as stream:
for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
StreamingResponse는 FastAPI에 내장된 응답 클래스로, media_type="text/event-stream"을 지정하면 SSE(Server-Sent Events) 형식으로 전송됩니다.
프론트엔드에서 SSE 수신하기
const response = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt: "오늘의 뉴스 요약해줘" }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n\n");
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
process.stdout.write(line.slice(6)); // "data: " 제거 후 출력
}
}
}
비동기 스트리밍 (AsyncAnthropic)
동시 요청이 많은 환경에서는 AsyncAnthropic 클라이언트와 async def 제너레이터를 사용합니다:
from anthropic import AsyncAnthropic
import asyncio
async_client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
@app.post("/chat/async-stream")
async def chat_async_stream(request: ChatRequest):
async def generate():
async with async_client.messages.stream(
model=request.model,
max_tokens=request.max_tokens,
messages=[{"role": "user", "content": request.prompt}]
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
비동기 클라이언트를 사용하면 uvicorn의 이벤트 루프를 블로킹하지 않아 다수의 동시 스트리밍 요청을 효율적으로 처리합니다. 프로덕션 아키텍처 설계에서 동시성 전략을 더 자세히 다룹니다.
Rate Limit 처리
Claude API는 분당 요청 수(RPM), 분당 토큰 수(TPM) 제한이 있습니다. 429 Too Many Requests 에러를 적절히 처리하지 않으면 서비스가 중단됩니다.
import time
from anthropic import RateLimitError, APIStatusError
def generate_with_retry(prompt: str, model: str, max_tokens: int, max_retries: int = 3):
"""지수 백오프로 Rate Limit 재시도"""
for attempt in range(max_retries):
try:
with client.messages.stream(
model=model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return # 성공 시 종료
except RateLimitError as e:
if attempt == max_retries - 1:
raise HTTPException(status_code=429, detail="Rate limit 초과. 잠시 후 다시 시도하세요.")
wait_time = 2 ** attempt # 1초, 2초, 4초
print(f"Rate limit 도달. {wait_time}초 후 재시도 (시도 {attempt + 1}/{max_retries})")
time.sleep(wait_time)
except APIStatusError as e:
raise HTTPException(status_code=e.status_code, detail=str(e.message))
@app.post("/chat/resilient")
async def chat_resilient(request: ChatRequest):
return StreamingResponse(
generate_with_retry(request.prompt, request.model, request.max_tokens),
media_type="text/event-stream"
)
Retry-After 헤더 활용
Anthropic API는 retry-after 헤더로 대기 시간을 알려줍니다. 이를 활용하면 더 정확한 재시도가 가능합니다:
except RateLimitError as e:
retry_after = int(e.response.headers.get("retry-after", 2 ** attempt))
print(f"Retry-After: {retry_after}초")
time.sleep(retry_after)
에러 처리 전략에 대한 자세한 내용은 Claude API 에러 처리 가이드를 참고하세요.
비용 로깅
Claude API 비용은 usage.input_tokens와 usage.output_tokens로 정확히 추적할 수 있습니다. 요청마다 비용을 로깅하면 월별 예산 관리가 쉬워집니다.
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 2026년 5월 기준 Claude Sonnet 4.5 가격 (USD per million tokens)
PRICING = {
"claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"claude-haiku-4-5": {"input": 0.80, "output": 4.00},
"claude-opus-4-5": {"input": 15.00, "output": 75.00},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""USD 기준 요청당 비용 계산"""
prices = PRICING.get(model, PRICING["claude-sonnet-4-5"])
input_cost = (input_tokens / 1_000_000) * prices["input"]
output_cost = (output_tokens / 1_000_000) * prices["output"]
return round(input_cost + output_cost, 6)
def log_usage(endpoint: str, model: str, input_tokens: int, output_tokens: int):
cost = calculate_cost(model, input_tokens, output_tokens)
logger.info(
"API_USAGE | endpoint=%s model=%s input=%d output=%d cost_usd=%.6f ts=%s",
endpoint, model, input_tokens, output_tokens, cost,
datetime.utcnow().isoformat()
)
스트리밍 엔드포인트에서 최종 사용량 로깅:
@app.post("/chat/logged-stream")
async def chat_logged_stream(request: ChatRequest):
def generate():
with client.messages.stream(
model=request.model,
max_tokens=request.max_tokens,
messages=[{"role": "user", "content": request.prompt}]
) as stream:
for text in stream.text_stream:
yield f"data: {text}\n\n"
# 스트림 완료 후 usage 정보 수집
final_message = stream.get_final_message()
log_usage(
endpoint="/chat/logged-stream",
model=request.model,
input_tokens=final_message.usage.input_tokens,
output_tokens=final_message.usage.output_tokens,
)
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
로그 출력 예시:
INFO: API_USAGE | endpoint=/chat/logged-stream model=claude-sonnet-4-5 input=45 output=312 cost_usd=0.005040 ts=2026-05-12T10:23:45
월별 비용을 집계하려면 이 로그를 PostgreSQL이나 BigQuery에 저장하고 SUM(cost_usd)로 쿼리합니다.
프로덕션 배포 체크리스트
FastAPI + Claude API 서비스를 프로덕션에 배포하기 전에 아래 5가지를 확인하세요.
- API 키를 환경변수로 주입 —
.env또는 시크릿 매니저(AWS Secrets Manager, GCP Secret Manager) 사용. 코드에 하드코딩 절대 금지 - 요청 타임아웃 설정 —
httpx클라이언트 타임아웃 지정 또는 FastAPIasyncio.wait_for()로 최대 응답 시간 제한 (권장: 60–120초) - Rate Limit 미들웨어 —
slowapi또는 Redis 기반 슬라이딩 윈도우로 엔드포인트별 RPM 제한 설정하여 API 쿼터 보호 - 에러 응답 표준화 —
@app.exception_handler로RateLimitError,APIConnectionError등을 일관된 JSON 에러 형식으로 변환 - 헬스체크 엔드포인트 —
GET /health에서 Claude API 연결 상태를 확인하고 Kubernetes Readiness Probe 또는 로드밸런서 헬스체크에 연결
@app.get("/health")
async def health_check():
try:
# 최소 토큰으로 연결 테스트
client.messages.create(
model="claude-haiku-4-5",
max_tokens=1,
messages=[{"role": "user", "content": "ping"}]
)
return {"status": "ok", "claude_api": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Claude API 연결 실패: {str(e)}")
Frequently Asked Questions
FastAPI와 Flask 중 어떤 것이 Claude API 연동에 더 적합한가요?
FastAPI가 더 적합합니다. FastAPI는 async/await를 네이티브로 지원하여 Claude API의 스트리밍 응답을 블로킹 없이 처리할 수 있습니다. Flask는 기본적으로 동기 처리 방식이라 다수의 동시 스트리밍 요청에서 성능이 저하됩니다. 또한 FastAPI는 Pydantic 기반 자동 유효성 검사와 OpenAPI 문서(/docs)를 제공해 개발 속도도 빠릅니다.
StreamingResponse로 스트림을 반환할 때 에러가 발생하면 어떻게 처리하나요?
제너레이터 함수 내부에서 try/except로 에러를 잡아 SSE 이벤트로 전달합니다:
def generate():
try:
with client.messages.stream(...) as stream:
for text in stream.text_stream:
yield f"data: {text}\n\n"
except RateLimitError:
yield 'data: {"error": "rate_limit"}\n\n'
except Exception as e:
yield f'data: {{"error": "{str(e)}"}}\n\n'
finally:
yield "data: [DONE]\n\n"
클라이언트 측에서는 data: 필드가 {"error": ...} 형식인지 확인해 에러를 처리합니다.
시스템 프롬프트는 어떻게 추가하나요?
messages.create() 또는 messages.stream()의 system 파라미터에 전달합니다:
client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
system="당신은 한국어 전문 법률 어시스턴트입니다. 항상 정확하고 간결하게 답변하세요.",
messages=[{"role": "user", "content": prompt}]
)
시스템 프롬프트는 input_tokens에 포함되어 비용이 발생합니다. 반복 호출이 많다면 프롬프트 캐싱을 적용해 비용을 최대 90%까지 절감할 수 있습니다.
대화 히스토리(멀티턴)는 어떻게 유지하나요?
messages 배열에 이전 대화를 누적해서 전달합니다. FastAPI에서는 세션 ID를 키로 메모리 딕셔너리나 Redis에 히스토리를 저장합니다:
from collections import defaultdict
conversation_history: dict[str, list] = defaultdict(list)
@app.post("/chat/multi-turn")
async def chat_multi_turn(session_id: str, request: ChatRequest):
history = conversation_history[session_id]
history.append({"role": "user", "content": request.prompt})
message = client.messages.create(
model=request.model,
max_tokens=request.max_tokens,
messages=history
)
assistant_reply = message.content[0].text
history.append({"role": "assistant", "content": assistant_reply})
return {"content": assistant_reply, "turns": len(history) // 2}
프로덕션에서는 메모리 딕셔너리 대신 Redis TTL을 사용해 오래된 세션을 자동 만료시키세요.
다음 단계
FastAPI + Claude API 스트리밍 엔드포인트를 구축했다면, 이를 확장해 에이전트 시스템으로 발전시킬 수 있습니다. Tool Use를 연동하면 Claude가 외부 API를 직접 호출하는 자율 에이전트가 됩니다.
Claude Agent SDK의 Subagent 패턴, Tool Use 구현, 멀티에이전트 워크플로우까지 실전 예제 50개를 담은 쿡북이 있습니다: