← All guides

Anthropic Message Batches API: 50% Cost Reduction for Bulk Processing

How to use Anthropic's Message Batches API for bulk processing — 50% cost reduction, async processing, result retrieval, and when to use batches vs.

Anthropic Message Batches API: 50% Cost Reduction for Bulk Processing

The Anthropic Message Batches API processes large volumes of requests asynchronously at 50% of standard pricing. Instead of sending requests one by one and paying full price, you batch up to 10,000 requests, submit them together, and retrieve results within 24 hours (typically 1–4 hours). The trade-off is latency: you cannot use batches for real-time user interactions. Use batches for document processing, data enrichment, content generation at scale, and any task where you can tolerate multi-hour turnaround.


When to use the Batches API

Use batches when:

Use real-time API when:

Cost comparison (Sonnet 4 as of April 2026):

Standard Batch
Input $3/M tokens $1.50/M tokens
Output $15/M tokens $7.50/M tokens
Latency 1–30 seconds 1–24 hours

At 1 million tokens per day, batches save ~$750/month.


Creating a batch

import anthropic
import json

client = anthropic.Anthropic()

# Prepare your requests (up to 10,000 per batch)
requests_data = [
    {
        "custom_id": f"extract-{i}",  # Your unique ID for tracking
        "params": {
            "model": "claude-sonnet-4-5",
            "max_tokens": 1024,
            "system": "Extract the key facts from this text as a JSON object.",
            "messages": [
                {"role": "user", "content": f"Extract from: {document}"}
            ]
        }
    }
    for i, document in enumerate(documents)
]

# Create the batch
batch = client.messages.batches.create(requests=requests_data)
print(f"Batch created: {batch.id}")
print(f"Status: {batch.processing_status}")
# Output: "in_progress"

The custom_id: your identifier for each request. Use it to match results to inputs. Must be unique within the batch (up to 64 characters).


Monitoring batch status

import time

def wait_for_batch(batch_id: str, poll_interval: int = 60) -> anthropic.types.MessageBatch:
    """
    Poll batch status until complete. Returns the completed batch.
    """
    while True:
        batch = client.messages.batches.retrieve(batch_id)
        
        print(f"Status: {batch.processing_status} | "
              f"Processed: {batch.request_counts.processing} | "
              f"Complete: {batch.request_counts.succeeded + batch.request_counts.errored}")
        
        if batch.processing_status == "ended":
            return batch
        
        time.sleep(poll_interval)

# Usage
batch = wait_for_batch(batch.id)
print(f"Batch complete. Results at: {batch.results_url}")

Status values:


Retrieving results

def process_batch_results(batch_id: str) -> dict[str, any]:
    """
    Retrieve and parse batch results.
    Returns dict mapping custom_id → extracted result.
    """
    results = {}
    
    for result in client.messages.batches.results(batch_id):
        custom_id = result.custom_id
        
        if result.result.type == "succeeded":
            message = result.result.message
            text = message.content[0].text
            
            # Parse JSON if your task returns structured output
            try:
                results[custom_id] = json.loads(text)
            except json.JSONDecodeError:
                results[custom_id] = {"text": text}
        
        elif result.result.type == "errored":
            results[custom_id] = {
                "error": result.result.error.error.type,
                "message": result.result.error.error.message,
            }
        
        elif result.result.type == "expired":
            # Request expired (24-hour limit exceeded)
            results[custom_id] = {"error": "expired"}
    
    return results

# Get results
results = process_batch_results(batch.id)
print(f"Succeeded: {sum(1 for r in results.values() if 'error' not in r)}")
print(f"Failed: {sum(1 for r in results.values() if 'error' in r)}")

Complete batch pipeline pattern

For production batch processing:

import anthropic
import json
import time
from pathlib import Path

class BatchProcessor:
    def __init__(self, client: anthropic.Anthropic):
        self.client = client
    
    def process(
        self,
        items: list[dict],
        system_prompt: str,
        message_fn: callable,
        model: str = "claude-sonnet-4-5",
        max_tokens: int = 1024,
        output_path: Path | None = None,
    ) -> dict[str, any]:
        """
        Process a list of items using the Batches API.
        
        Args:
            items: list of {"id": str, "data": any}
            system_prompt: system prompt for all requests
            message_fn: function(item) -> str (the user message)
            output_path: optional path to save results JSON
        """
        # Build batch requests
        requests = [
            {
                "custom_id": item["id"],
                "params": {
                    "model": model,
                    "max_tokens": max_tokens,
                    "system": system_prompt,
                    "messages": [{"role": "user", "content": message_fn(item)}],
                }
            }
            for item in items
        ]
        
        # Submit batch (max 10,000 per batch)
        if len(requests) > 10_000:
            raise ValueError(f"Batch size {len(requests)} exceeds 10,000 limit. Split into multiple batches.")
        
        batch = self.client.messages.batches.create(requests=requests)
        print(f"Submitted batch {batch.id} with {len(requests)} requests")
        
        # Wait for completion
        while True:
            batch = self.client.messages.batches.retrieve(batch.id)
            if batch.processing_status == "ended":
                break
            print(f"Waiting... {batch.request_counts.processing} remaining")
            time.sleep(60)
        
        # Collect results
        results = {}
        for result in self.client.messages.batches.results(batch.id):
            if result.result.type == "succeeded":
                results[result.custom_id] = result.result.message.content[0].text
            else:
                results[result.custom_id] = None
        
        # Save if requested
        if output_path:
            output_path.write_text(json.dumps(results, indent=2))
        
        return results


# Usage
processor = BatchProcessor(client)

documents = [
    {"id": f"doc-{i}", "data": doc_text}
    for i, doc_text in enumerate(document_list)
]

results = processor.process(
    items=documents,
    system_prompt="Extract the key entities (people, companies, dates) as JSON.",
    message_fn=lambda item: f"Extract from:\n\n{item['data']}",
    output_path=Path("extraction_results.json"),
)

print(f"Processed {len(results)} documents")

Batch size and splitting

Maximum batch size is 10,000 requests. For larger datasets:

def chunked(lst: list, size: int):
    """Split list into chunks of given size."""
    for i in range(0, len(lst), size):
        yield lst[i:i + size]

def process_large_dataset(items: list, batch_size: int = 5_000) -> dict:
    """Process a large dataset by splitting into multiple batches."""
    all_results = {}
    
    for chunk_idx, chunk in enumerate(chunked(items, batch_size)):
        print(f"Submitting batch {chunk_idx + 1} ({len(chunk)} items)")
        batch_results = processor.process(chunk, system_prompt, message_fn)
        all_results.update(batch_results)
    
    return all_results

Cancelling a batch

# Cancel if you no longer need the results (you're charged for completed requests)
client.messages.batches.cancel(batch_id)

Cancellation is async. In-progress requests at cancellation time will still be charged.


Frequently asked questions

How long does the Batches API take? Anthropic documents up to 24 hours. In practice, batches typically complete in 1–4 hours for standard sizes. Very large batches (10,000 requests) may take longer.

Are there any request types that don't support batches? Tool use (function calling) is supported in batches. Streaming is not — batches are always non-streaming. Computer use is not currently supported in batches.

What happens to requests that expire? Individual requests within a batch can expire if the 24-hour limit is reached. The batch itself will have processing_status: "ended" but individual results will show type: "expired". Retry expired requests in a new batch.

Can I mix different models in one batch? Yes. Each request in the batch can specify a different model. You could have some requests using Haiku and others using Sonnet in the same batch.

Is the 50% discount applied automatically? Yes. When you use the Batches API, batch pricing applies automatically. You don't need to enable or request it separately.


Related guides


Take It Further

Claude API Cost Optimization Toolkit — The complete cost reduction system: batch API implementation patterns, model routing for batch workloads, the hybrid real-time/batch architecture, and the cost calculator that shows exactly how much you save.

→ Get the Cost Optimization Toolkit — $59

30-day money-back guarantee. Instant download.

AI Disclosure: Drafted with Claude Code; all Batches API patterns from Anthropic documentation as of April 2026.

Tools and references