Anthropic Message Batches API: 50% Cost Reduction for Bulk Processing
The Anthropic Message Batches API processes large volumes of requests asynchronously at 50% of standard pricing. Instead of sending requests one by one and paying full price, you batch up to 10,000 requests, submit them together, and retrieve results within 24 hours (typically 1–4 hours). The trade-off is latency: you cannot use batches for real-time user interactions. Use batches for document processing, data enrichment, content generation at scale, and any task where you can tolerate multi-hour turnaround.
When to use the Batches API
Use batches when:
- You're processing a large dataset offline (document analysis, data extraction)
- The task can tolerate hours of delay (not user-facing)
- Cost reduction matters more than immediate results
- You need to process 100+ similar requests
Use real-time API when:
- Users are waiting for results
- Latency under 30 seconds is required
- Request count is under 50
Cost comparison (Sonnet 4 as of April 2026):
| Standard | Batch | |
|---|---|---|
| Input | $3/M tokens | $1.50/M tokens |
| Output | $15/M tokens | $7.50/M tokens |
| Latency | 1–30 seconds | 1–24 hours |
At 1 million tokens per day, batches save ~$750/month.
Creating a batch
import anthropic
import json
client = anthropic.Anthropic()
# Prepare your requests (up to 10,000 per batch)
requests_data = [
{
"custom_id": f"extract-{i}", # Your unique ID for tracking
"params": {
"model": "claude-sonnet-4-5",
"max_tokens": 1024,
"system": "Extract the key facts from this text as a JSON object.",
"messages": [
{"role": "user", "content": f"Extract from: {document}"}
]
}
}
for i, document in enumerate(documents)
]
# Create the batch
batch = client.messages.batches.create(requests=requests_data)
print(f"Batch created: {batch.id}")
print(f"Status: {batch.processing_status}")
# Output: "in_progress"
The custom_id: your identifier for each request. Use it to match results to inputs. Must be unique within the batch (up to 64 characters).
Monitoring batch status
import time
def wait_for_batch(batch_id: str, poll_interval: int = 60) -> anthropic.types.MessageBatch:
"""
Poll batch status until complete. Returns the completed batch.
"""
while True:
batch = client.messages.batches.retrieve(batch_id)
print(f"Status: {batch.processing_status} | "
f"Processed: {batch.request_counts.processing} | "
f"Complete: {batch.request_counts.succeeded + batch.request_counts.errored}")
if batch.processing_status == "ended":
return batch
time.sleep(poll_interval)
# Usage
batch = wait_for_batch(batch.id)
print(f"Batch complete. Results at: {batch.results_url}")
Status values:
in_progress— processing (check back later)canceling— cancel in progressended— complete (results ready)
Retrieving results
def process_batch_results(batch_id: str) -> dict[str, any]:
"""
Retrieve and parse batch results.
Returns dict mapping custom_id → extracted result.
"""
results = {}
for result in client.messages.batches.results(batch_id):
custom_id = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
text = message.content[0].text
# Parse JSON if your task returns structured output
try:
results[custom_id] = json.loads(text)
except json.JSONDecodeError:
results[custom_id] = {"text": text}
elif result.result.type == "errored":
results[custom_id] = {
"error": result.result.error.error.type,
"message": result.result.error.error.message,
}
elif result.result.type == "expired":
# Request expired (24-hour limit exceeded)
results[custom_id] = {"error": "expired"}
return results
# Get results
results = process_batch_results(batch.id)
print(f"Succeeded: {sum(1 for r in results.values() if 'error' not in r)}")
print(f"Failed: {sum(1 for r in results.values() if 'error' in r)}")
Complete batch pipeline pattern
For production batch processing:
import anthropic
import json
import time
from pathlib import Path
class BatchProcessor:
def __init__(self, client: anthropic.Anthropic):
self.client = client
def process(
self,
items: list[dict],
system_prompt: str,
message_fn: callable,
model: str = "claude-sonnet-4-5",
max_tokens: int = 1024,
output_path: Path | None = None,
) -> dict[str, any]:
"""
Process a list of items using the Batches API.
Args:
items: list of {"id": str, "data": any}
system_prompt: system prompt for all requests
message_fn: function(item) -> str (the user message)
output_path: optional path to save results JSON
"""
# Build batch requests
requests = [
{
"custom_id": item["id"],
"params": {
"model": model,
"max_tokens": max_tokens,
"system": system_prompt,
"messages": [{"role": "user", "content": message_fn(item)}],
}
}
for item in items
]
# Submit batch (max 10,000 per batch)
if len(requests) > 10_000:
raise ValueError(f"Batch size {len(requests)} exceeds 10,000 limit. Split into multiple batches.")
batch = self.client.messages.batches.create(requests=requests)
print(f"Submitted batch {batch.id} with {len(requests)} requests")
# Wait for completion
while True:
batch = self.client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
print(f"Waiting... {batch.request_counts.processing} remaining")
time.sleep(60)
# Collect results
results = {}
for result in self.client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
results[result.custom_id] = result.result.message.content[0].text
else:
results[result.custom_id] = None
# Save if requested
if output_path:
output_path.write_text(json.dumps(results, indent=2))
return results
# Usage
processor = BatchProcessor(client)
documents = [
{"id": f"doc-{i}", "data": doc_text}
for i, doc_text in enumerate(document_list)
]
results = processor.process(
items=documents,
system_prompt="Extract the key entities (people, companies, dates) as JSON.",
message_fn=lambda item: f"Extract from:\n\n{item['data']}",
output_path=Path("extraction_results.json"),
)
print(f"Processed {len(results)} documents")
Batch size and splitting
Maximum batch size is 10,000 requests. For larger datasets:
def chunked(lst: list, size: int):
"""Split list into chunks of given size."""
for i in range(0, len(lst), size):
yield lst[i:i + size]
def process_large_dataset(items: list, batch_size: int = 5_000) -> dict:
"""Process a large dataset by splitting into multiple batches."""
all_results = {}
for chunk_idx, chunk in enumerate(chunked(items, batch_size)):
print(f"Submitting batch {chunk_idx + 1} ({len(chunk)} items)")
batch_results = processor.process(chunk, system_prompt, message_fn)
all_results.update(batch_results)
return all_results
Cancelling a batch
# Cancel if you no longer need the results (you're charged for completed requests)
client.messages.batches.cancel(batch_id)
Cancellation is async. In-progress requests at cancellation time will still be charged.
Frequently asked questions
How long does the Batches API take? Anthropic documents up to 24 hours. In practice, batches typically complete in 1–4 hours for standard sizes. Very large batches (10,000 requests) may take longer.
Are there any request types that don't support batches? Tool use (function calling) is supported in batches. Streaming is not — batches are always non-streaming. Computer use is not currently supported in batches.
What happens to requests that expire?
Individual requests within a batch can expire if the 24-hour limit is reached. The batch itself will have processing_status: "ended" but individual results will show type: "expired". Retry expired requests in a new batch.
Can I mix different models in one batch? Yes. Each request in the batch can specify a different model. You could have some requests using Haiku and others using Sonnet in the same batch.
Is the 50% discount applied automatically? Yes. When you use the Batches API, batch pricing applies automatically. You don't need to enable or request it separately.
Related guides
- Claude API Cost Optimisation: Practical Guide — full cost reduction playbook including batches
- Claude Model Routing: When to Use Haiku, Sonnet, or Opus — combine routing with batches for maximum savings
Take It Further
Claude API Cost Optimization Toolkit — The complete cost reduction system: batch API implementation patterns, model routing for batch workloads, the hybrid real-time/batch architecture, and the cost calculator that shows exactly how much you save.
→ Get the Cost Optimization Toolkit — $59
30-day money-back guarantee. Instant download.