← All guides

How to Handle Errors and Retries in Claude Agent SDK

Production error handling for Claude Agent SDK — rate limit errors, network failures, tool call failures, retry strategies with exponential backoff.

How to Handle Errors and Retries in Claude Agent SDK

Production Claude agents fail in predictable ways — rate limit errors (429), overload errors (529), network timeouts, tool call failures, and infinite loops. Each requires a different recovery strategy, and the difference between a production-grade agent and a fragile prototype is having all five handled correctly. This guide covers every error type, the right retry strategy for each, and the circuit breaker pattern that prevents cascading failures.


The Error Taxonomy

Claude Agent SDK errors fall into five categories:

Category HTTP Status Cause Retry?
Rate limit 429 Too many requests Yes, with backoff
Overloaded 529 API server busy Yes, with backoff
Auth error 401 Bad API key No — fix the key
Invalid request 400 Bad parameters No — fix the code
Network failure No status Connection dropped Yes, immediately
Tool failure N/A Your tool code crashed Depends
Agent loop N/A Agent running forever Kill after max turns

Base Error Handling Setup

Start with this error handling wrapper before building anything else:

import anthropic
import time
import random
from typing import Callable, TypeVar

client = anthropic.Anthropic()
T = TypeVar("T")


def with_retry(
    fn: Callable[[], T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
) -> T:
    """
    Execute fn with exponential backoff retry.
    Retries on rate limits (429) and overload (529).
    Raises immediately on auth errors (401) and bad requests (400).
    """
    for attempt in range(max_retries + 1):
        try:
            return fn()

        except anthropic.RateLimitError as e:
            if attempt == max_retries:
                raise
            # Respect Retry-After header if present
            retry_after = getattr(e, "retry_after", None)
            delay = retry_after or (base_delay * (2 ** attempt) + random.uniform(0, 1))
            delay = min(delay, max_delay)
            print(f"Rate limit hit. Waiting {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
            time.sleep(delay)

        except anthropic.APIStatusError as e:
            if e.status_code == 529:  # Overloaded
                if attempt == max_retries:
                    raise
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                delay = min(delay, max_delay)
                print(f"API overloaded. Waiting {delay:.1f}s")
                time.sleep(delay)
            elif e.status_code in (400, 401, 403):
                raise  # Don't retry — fix the code or credentials
            else:
                if attempt == max_retries:
                    raise
                time.sleep(base_delay)

        except anthropic.APIConnectionError:
            if attempt == max_retries:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"Network error. Retrying in {delay:.1f}s")
            time.sleep(delay)


# Usage
response = with_retry(
    lambda: client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}]
    )
)

Handling Rate Limits (429)

Rate limits are the most common production error. The Anthropic API enforces:

Reading the Retry-After header

When you hit a rate limit, the API returns a Retry-After header with the exact wait time. Always use it:

import anthropic
import time

client = anthropic.Anthropic()


def create_with_rate_limit_handling(messages: list, **kwargs):
    max_retries = 5
    
    for attempt in range(max_retries):
        try:
            return client.messages.create(
                model="claude-sonnet-4-5",
                max_tokens=1024,
                messages=messages,
                **kwargs
            )
        except anthropic.RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            
            # Get wait time from response headers
            # anthropic-sdk exposes this via the error response
            wait_time = 60  # Default fallback
            if hasattr(e, "response") and e.response is not None:
                retry_after = e.response.headers.get("retry-after")
                if retry_after:
                    wait_time = int(retry_after) + 1  # Add 1s buffer
            
            print(f"Rate limited. Waiting {wait_time}s...")
            time.sleep(wait_time)
    
    raise RuntimeError("Max retries exceeded")

Proactive rate limit prevention

For batch processing, throttle requests to stay under limits:

import asyncio
import time
from collections import deque


class RateLimiter:
    """Token bucket rate limiter for Claude API."""
    
    def __init__(self, requests_per_minute: int = 50):
        self.rpm = requests_per_minute
        self.min_interval = 60.0 / requests_per_minute
        self.last_request_time = 0.0
    
    def wait(self):
        """Block until it's safe to make the next request."""
        now = time.time()
        elapsed = now - self.last_request_time
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self.last_request_time = time.time()


# Usage in batch processing
limiter = RateLimiter(requests_per_minute=40)  # Stay under the 50 RPM limit

for item in batch:
    limiter.wait()
    response = client.messages.create(...)

Tool Call Error Handling

When Claude calls a tool and the tool fails, you control the recovery:

import anthropic
import json
import traceback

client = anthropic.Anthropic()


def execute_tool(tool_name: str, tool_input: dict) -> str:
    """Execute a tool and return result or error description."""
    try:
        if tool_name == "read_file":
            with open(tool_input["path"], "r") as f:
                return f.read()
        elif tool_name == "run_query":
            return run_db_query(tool_input["sql"])
        else:
            return f"Error: Unknown tool '{tool_name}'"
    except FileNotFoundError:
        return f"Error: File not found: {tool_input.get('path', 'unknown')}"
    except PermissionError:
        return f"Error: Permission denied: {tool_input.get('path', 'unknown')}"
    except Exception as e:
        # Log the full traceback internally, return safe message to Claude
        print(f"Tool error: {traceback.format_exc()}")
        return f"Error: Tool execution failed — {type(e).__name__}: {str(e)}"


def run_agent_with_tools(user_message: str, tools: list) -> str:
    """Run an agent loop with proper tool error handling."""
    messages = [{"role": "user", "content": user_message}]
    
    for turn in range(20):  # Max 20 turns to prevent loops
        response = with_retry(
            lambda: client.messages.create(
                model="claude-sonnet-4-5",
                max_tokens=4096,
                tools=tools,
                messages=messages,
            )
        )
        
        # Check if done
        if response.stop_reason == "end_turn":
            # Return the text response
            for block in response.content:
                if block.type == "text":
                    return block.text
            return "Task completed."
        
        # Process tool calls
        if response.stop_reason == "tool_use":
            messages.append({"role": "assistant", "content": response.content})
            
            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    # Execute the tool
                    result = execute_tool(block.name, block.input)
                    
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                        # Mark as error if result starts with "Error:"
                        "is_error": result.startswith("Error:"),
                    })
            
            messages.append({"role": "user", "content": tool_results})
            continue
        
        # Unexpected stop reason
        print(f"Unexpected stop_reason: {response.stop_reason}")
        break
    
    return "Agent reached maximum turns without completing the task."

Preventing Infinite Agent Loops

Without turn limits, agents can loop forever on unsolvable tasks.

class AgentLoopGuard:
    """Detects and prevents infinite agent loops."""
    
    def __init__(self, max_turns: int = 20, max_identical_tool_calls: int = 3):
        self.max_turns = max_turns
        self.max_identical = max_identical_tool_calls
        self.turn_count = 0
        self.tool_call_history: list[tuple[str, str]] = []
    
    def check(self, tool_name: str = None, tool_input: dict = None):
        self.turn_count += 1
        
        if self.turn_count > self.max_turns:
            raise RuntimeError(f"Agent exceeded {self.max_turns} turns — likely stuck in a loop")
        
        if tool_name and tool_input:
            # Detect repeated identical tool calls
            call_signature = (tool_name, str(sorted(tool_input.items())))
            self.tool_call_history.append(call_signature)
            
            # Count recent identical calls
            recent_calls = self.tool_call_history[-10:]
            identical_count = recent_calls.count(call_signature)
            
            if identical_count >= self.max_identical:
                raise RuntimeError(
                    f"Agent called {tool_name} with identical inputs {identical_count} times — "
                    f"stuck in loop. Last input: {tool_input}"
                )


# Usage in agent loop
guard = AgentLoopGuard(max_turns=15, max_identical_tool_calls=2)

while True:
    guard.check()  # Raises if we've looped too long
    
    response = client.messages.create(...)
    
    if response.stop_reason == "end_turn":
        break
    
    for block in response.content:
        if block.type == "tool_use":
            guard.check(block.name, block.input)  # Check for tool loop

TypeScript Error Handling

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries = 3,
  baseDelay = 1000
): Promise<T> {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (error instanceof Anthropic.RateLimitError) {
        if (attempt === maxRetries) throw error;
        const delay = baseDelay * Math.pow(2, attempt) + Math.random() * 1000;
        console.log(`Rate limited. Waiting ${(delay / 1000).toFixed(1)}s...`);
        await new Promise((r) => setTimeout(r, delay));
      } else if (error instanceof Anthropic.APIStatusError) {
        if (error.status === 529) {
          if (attempt === maxRetries) throw error;
          const delay = baseDelay * Math.pow(2, attempt);
          await new Promise((r) => setTimeout(r, delay));
        } else if ([400, 401, 403].includes(error.status)) {
          throw error; // Don't retry
        } else {
          if (attempt === maxRetries) throw error;
          await new Promise((r) => setTimeout(r, baseDelay));
        }
      } else if (error instanceof Anthropic.APIConnectionError) {
        if (attempt === maxRetries) throw error;
        await new Promise((r) => setTimeout(r, baseDelay));
      } else {
        throw error; // Unknown error — don't retry
      }
    }
  }
  throw new Error("Max retries exceeded");
}

Circuit Breaker Pattern

For high-volume production agents, add a circuit breaker to stop hammering the API during outages:

from enum import Enum
import time


class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing — reject requests fast
    HALF_OPEN = "half_open" # Testing if service recovered


class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.state = CircuitState.CLOSED
    
    def call(self, fn):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise RuntimeError("Circuit breaker OPEN — API unavailable")
        
        try:
            result = fn()
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                print("Circuit breaker CLOSED — API recovered")
            return result
        
        except (anthropic.RateLimitError, anthropic.APIStatusError) as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(f"Circuit breaker OPEN after {self.failure_count} failures")
            raise


# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

try:
    response = breaker.call(lambda: client.messages.create(...))
except RuntimeError as e:
    if "Circuit breaker OPEN" in str(e):
        print("API is down — using fallback or queuing for later")

Frequently Asked Questions

What is the most common Claude API error in production? Rate limit errors (429) are the most common. They occur when your request rate exceeds your account tier's RPM or TPM limits. The fix is exponential backoff with the Retry-After header — not immediate retry.

What's the difference between a 429 and a 529 error? 429 (Rate Limit) means you're sending too many requests too quickly — back off and retry after the specified wait. 529 (Overloaded) means the API server is temporarily at capacity — use the same backoff strategy.

Should I retry 400 errors? No. A 400 error means your request is malformed — the parameters are wrong. Retrying the same request returns the same error. Fix the code that generates the request.

How do I handle a tool that keeps failing? After N tool failures, provide Claude with a clear error message and let it decide to stop or try a different approach. Don't silently swallow tool errors — Claude needs to know a tool failed to reason about alternatives.

What's a safe max_turns value for a production agent? 10-20 turns for most tasks. Simple tasks (data lookup, summarization) need 3-5. Complex multi-step tasks need 10-15. Set a hard limit of 20 and log any session that hits it — those are bugs or prompts that need improvement.


Related Guides


Go Deeper

Agent SDK Cookbook — $49 — Production-ready error handling templates, circuit breaker implementations, batch processing with rate limiting, and retry strategy patterns for Python and TypeScript agents.

→ Get the Agent SDK Cookbook — $49

30-day money-back guarantee. Instant download.

AI Disclosure: Written with Claude Code; error patterns verified against Anthropic API.

Tools and references