How to Handle Errors and Retries in Claude Agent SDK
Production Claude agents fail in predictable ways — rate limit errors (429), overload errors (529), network timeouts, tool call failures, and infinite loops. Each requires a different recovery strategy, and the difference between a production-grade agent and a fragile prototype is having all five handled correctly. This guide covers every error type, the right retry strategy for each, and the circuit breaker pattern that prevents cascading failures.
The Error Taxonomy
Claude Agent SDK errors fall into five categories:
| Category | HTTP Status | Cause | Retry? |
|---|---|---|---|
| Rate limit | 429 | Too many requests | Yes, with backoff |
| Overloaded | 529 | API server busy | Yes, with backoff |
| Auth error | 401 | Bad API key | No — fix the key |
| Invalid request | 400 | Bad parameters | No — fix the code |
| Network failure | No status | Connection dropped | Yes, immediately |
| Tool failure | N/A | Your tool code crashed | Depends |
| Agent loop | N/A | Agent running forever | Kill after max turns |
Base Error Handling Setup
Start with this error handling wrapper before building anything else:
import anthropic
import time
import random
from typing import Callable, TypeVar
client = anthropic.Anthropic()
T = TypeVar("T")
def with_retry(
fn: Callable[[], T],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> T:
"""
Execute fn with exponential backoff retry.
Retries on rate limits (429) and overload (529).
Raises immediately on auth errors (401) and bad requests (400).
"""
for attempt in range(max_retries + 1):
try:
return fn()
except anthropic.RateLimitError as e:
if attempt == max_retries:
raise
# Respect Retry-After header if present
retry_after = getattr(e, "retry_after", None)
delay = retry_after or (base_delay * (2 ** attempt) + random.uniform(0, 1))
delay = min(delay, max_delay)
print(f"Rate limit hit. Waiting {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
except anthropic.APIStatusError as e:
if e.status_code == 529: # Overloaded
if attempt == max_retries:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
delay = min(delay, max_delay)
print(f"API overloaded. Waiting {delay:.1f}s")
time.sleep(delay)
elif e.status_code in (400, 401, 403):
raise # Don't retry — fix the code or credentials
else:
if attempt == max_retries:
raise
time.sleep(base_delay)
except anthropic.APIConnectionError:
if attempt == max_retries:
raise
delay = base_delay * (2 ** attempt)
print(f"Network error. Retrying in {delay:.1f}s")
time.sleep(delay)
# Usage
response = with_retry(
lambda: client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
)
)
Handling Rate Limits (429)
Rate limits are the most common production error. The Anthropic API enforces:
- RPM (requests per minute) limits by tier
- TPM (tokens per minute) limits by tier
- Daily token limits on lower tiers
Reading the Retry-After header
When you hit a rate limit, the API returns a Retry-After header with the exact wait time. Always use it:
import anthropic
import time
client = anthropic.Anthropic()
def create_with_rate_limit_handling(messages: list, **kwargs):
max_retries = 5
for attempt in range(max_retries):
try:
return client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=messages,
**kwargs
)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
# Get wait time from response headers
# anthropic-sdk exposes this via the error response
wait_time = 60 # Default fallback
if hasattr(e, "response") and e.response is not None:
retry_after = e.response.headers.get("retry-after")
if retry_after:
wait_time = int(retry_after) + 1 # Add 1s buffer
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
raise RuntimeError("Max retries exceeded")
Proactive rate limit prevention
For batch processing, throttle requests to stay under limits:
import asyncio
import time
from collections import deque
class RateLimiter:
"""Token bucket rate limiter for Claude API."""
def __init__(self, requests_per_minute: int = 50):
self.rpm = requests_per_minute
self.min_interval = 60.0 / requests_per_minute
self.last_request_time = 0.0
def wait(self):
"""Block until it's safe to make the next request."""
now = time.time()
elapsed = now - self.last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_request_time = time.time()
# Usage in batch processing
limiter = RateLimiter(requests_per_minute=40) # Stay under the 50 RPM limit
for item in batch:
limiter.wait()
response = client.messages.create(...)
Tool Call Error Handling
When Claude calls a tool and the tool fails, you control the recovery:
import anthropic
import json
import traceback
client = anthropic.Anthropic()
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return result or error description."""
try:
if tool_name == "read_file":
with open(tool_input["path"], "r") as f:
return f.read()
elif tool_name == "run_query":
return run_db_query(tool_input["sql"])
else:
return f"Error: Unknown tool '{tool_name}'"
except FileNotFoundError:
return f"Error: File not found: {tool_input.get('path', 'unknown')}"
except PermissionError:
return f"Error: Permission denied: {tool_input.get('path', 'unknown')}"
except Exception as e:
# Log the full traceback internally, return safe message to Claude
print(f"Tool error: {traceback.format_exc()}")
return f"Error: Tool execution failed — {type(e).__name__}: {str(e)}"
def run_agent_with_tools(user_message: str, tools: list) -> str:
"""Run an agent loop with proper tool error handling."""
messages = [{"role": "user", "content": user_message}]
for turn in range(20): # Max 20 turns to prevent loops
response = with_retry(
lambda: client.messages.create(
model="claude-sonnet-4-5",
max_tokens=4096,
tools=tools,
messages=messages,
)
)
# Check if done
if response.stop_reason == "end_turn":
# Return the text response
for block in response.content:
if block.type == "text":
return block.text
return "Task completed."
# Process tool calls
if response.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
# Execute the tool
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
# Mark as error if result starts with "Error:"
"is_error": result.startswith("Error:"),
})
messages.append({"role": "user", "content": tool_results})
continue
# Unexpected stop reason
print(f"Unexpected stop_reason: {response.stop_reason}")
break
return "Agent reached maximum turns without completing the task."
Preventing Infinite Agent Loops
Without turn limits, agents can loop forever on unsolvable tasks.
class AgentLoopGuard:
"""Detects and prevents infinite agent loops."""
def __init__(self, max_turns: int = 20, max_identical_tool_calls: int = 3):
self.max_turns = max_turns
self.max_identical = max_identical_tool_calls
self.turn_count = 0
self.tool_call_history: list[tuple[str, str]] = []
def check(self, tool_name: str = None, tool_input: dict = None):
self.turn_count += 1
if self.turn_count > self.max_turns:
raise RuntimeError(f"Agent exceeded {self.max_turns} turns — likely stuck in a loop")
if tool_name and tool_input:
# Detect repeated identical tool calls
call_signature = (tool_name, str(sorted(tool_input.items())))
self.tool_call_history.append(call_signature)
# Count recent identical calls
recent_calls = self.tool_call_history[-10:]
identical_count = recent_calls.count(call_signature)
if identical_count >= self.max_identical:
raise RuntimeError(
f"Agent called {tool_name} with identical inputs {identical_count} times — "
f"stuck in loop. Last input: {tool_input}"
)
# Usage in agent loop
guard = AgentLoopGuard(max_turns=15, max_identical_tool_calls=2)
while True:
guard.check() # Raises if we've looped too long
response = client.messages.create(...)
if response.stop_reason == "end_turn":
break
for block in response.content:
if block.type == "tool_use":
guard.check(block.name, block.input) # Check for tool loop
TypeScript Error Handling
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
async function withRetry<T>(
fn: () => Promise<T>,
maxRetries = 3,
baseDelay = 1000
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
if (error instanceof Anthropic.RateLimitError) {
if (attempt === maxRetries) throw error;
const delay = baseDelay * Math.pow(2, attempt) + Math.random() * 1000;
console.log(`Rate limited. Waiting ${(delay / 1000).toFixed(1)}s...`);
await new Promise((r) => setTimeout(r, delay));
} else if (error instanceof Anthropic.APIStatusError) {
if (error.status === 529) {
if (attempt === maxRetries) throw error;
const delay = baseDelay * Math.pow(2, attempt);
await new Promise((r) => setTimeout(r, delay));
} else if ([400, 401, 403].includes(error.status)) {
throw error; // Don't retry
} else {
if (attempt === maxRetries) throw error;
await new Promise((r) => setTimeout(r, baseDelay));
}
} else if (error instanceof Anthropic.APIConnectionError) {
if (attempt === maxRetries) throw error;
await new Promise((r) => setTimeout(r, baseDelay));
} else {
throw error; // Unknown error — don't retry
}
}
}
throw new Error("Max retries exceeded");
}
Circuit Breaker Pattern
For high-volume production agents, add a circuit breaker to stop hammering the API during outages:
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing — reject requests fast
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = CircuitState.CLOSED
def call(self, fn):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise RuntimeError("Circuit breaker OPEN — API unavailable")
try:
result = fn()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
print("Circuit breaker CLOSED — API recovered")
return result
except (anthropic.RateLimitError, anthropic.APIStatusError) as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker OPEN after {self.failure_count} failures")
raise
# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
try:
response = breaker.call(lambda: client.messages.create(...))
except RuntimeError as e:
if "Circuit breaker OPEN" in str(e):
print("API is down — using fallback or queuing for later")
Frequently Asked Questions
What is the most common Claude API error in production? Rate limit errors (429) are the most common. They occur when your request rate exceeds your account tier's RPM or TPM limits. The fix is exponential backoff with the Retry-After header — not immediate retry.
What's the difference between a 429 and a 529 error? 429 (Rate Limit) means you're sending too many requests too quickly — back off and retry after the specified wait. 529 (Overloaded) means the API server is temporarily at capacity — use the same backoff strategy.
Should I retry 400 errors? No. A 400 error means your request is malformed — the parameters are wrong. Retrying the same request returns the same error. Fix the code that generates the request.
How do I handle a tool that keeps failing? After N tool failures, provide Claude with a clear error message and let it decide to stop or try a different approach. Don't silently swallow tool errors — Claude needs to know a tool failed to reason about alternatives.
What's a safe max_turns value for a production agent? 10-20 turns for most tasks. Simple tasks (data lookup, summarization) need 3-5. Complex multi-step tasks need 10-15. Set a hard limit of 20 and log any session that hits it — those are bugs or prompts that need improvement.
Related Guides
- Claude Agent SDK: Build Automation Agents — Full agent SDK guide
- Claude API Rate Limits: Handle Them Gracefully — Rate limit deep dive
- Token Counting: Why Your Estimates Are Wrong — Cost management
Go Deeper
Agent SDK Cookbook — $49 — Production-ready error handling templates, circuit breaker implementations, batch processing with rate limiting, and retry strategy patterns for Python and TypeScript agents.
→ Get the Agent SDK Cookbook — $49
30-day money-back guarantee. Instant download.