Claude Agent SDK for Data Pipelines: ETL, Validation, and Transformation Agents
The Claude Agent SDK fits data pipelines when the logic is too variable for rigid rules: schema drift, inconsistent source formats, validation that requires judgment, and transformation logic that adapts to data shape. This guide builds three pipeline agents: a schema validation agent that explains failures in plain English, an ETL orchestrator that routes records based on content, and a data quality agent that generates and runs its own checks.
When Claude Agents Make Sense in Data Pipelines
Use an agent when:
- Source schema changes unpredictably — the agent interprets what changed vs what broke
- Validation requires context — "is this address valid?" is different from "does this field match a regex?"
- Transformation logic needs judgment — merging records with conflicting fields
- You need readable failure reports — for non-engineers to act on
Don't use an agent when:
- Schema is stable and transforms are deterministic — use dbt, Airflow, pandas
- You need sub-second throughput — LLM calls add 0.5-2s per invocation
- Cost is a concern — at scale, LLM validation per row gets expensive fast
The sweet spot: batch validation and orchestration, not row-level transformation.
Setup
import anthropic
import json
from typing import Any
from dataclasses import dataclass
client = anthropic.Anthropic()
Agent 1: Schema Validation Agent
Validates incoming data against an expected schema, returns structured failures with plain-English explanations.
VALIDATION_TOOLS = [
{
"name": "validate_field",
"description": "Validate a single field value against its schema definition",
"input_schema": {
"type": "object",
"properties": {
"field_name": {"type": "string"},
"value": {},
"expected_type": {"type": "string"},
"constraints": {
"type": "object",
"description": "e.g., {min: 0, max: 100} or {enum: ['A', 'B']} or {pattern: '...'}"
}
},
"required": ["field_name", "value", "expected_type"]
}
},
{
"name": "report_validation_result",
"description": "Report the final validation result for the record",
"input_schema": {
"type": "object",
"properties": {
"is_valid": {"type": "boolean"},
"errors": {
"type": "array",
"items": {
"type": "object",
"properties": {
"field": {"type": "string"},
"issue": {"type": "string"},
"action": {"type": "string", "description": "Recommended fix"}
}
}
},
"warnings": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["is_valid", "errors"]
}
}
]
def execute_validation_tool(tool_name: str, tool_input: dict, record: dict) -> str:
if tool_name == "validate_field":
field = tool_input["field_name"]
value = record.get(field)
expected_type = tool_input["expected_type"]
constraints = tool_input.get("constraints", {})
issues = []
# Type check
type_map = {"string": str, "integer": int, "float": float, "boolean": bool}
if expected_type in type_map and value is not None:
if not isinstance(value, type_map[expected_type]):
issues.append(f"Expected {expected_type}, got {type(value).__name__}")
# Constraint checks
if "min" in constraints and isinstance(value, (int, float)):
if value < constraints["min"]:
issues.append(f"Value {value} is below minimum {constraints['min']}")
if "max" in constraints and isinstance(value, (int, float)):
if value > constraints["max"]:
issues.append(f"Value {value} exceeds maximum {constraints['max']}")
if "enum" in constraints and value not in constraints["enum"]:
issues.append(f"Value '{value}' not in allowed values: {constraints['enum']}")
if value is None and not constraints.get("nullable", False):
issues.append("Required field is null/missing")
return json.dumps({"field": field, "value": value, "issues": issues})
elif tool_name == "report_validation_result":
return json.dumps({"reported": True})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
@dataclass
class ValidationResult:
is_valid: bool
errors: list[dict]
warnings: list[str]
def validate_record(record: dict, schema: dict) -> ValidationResult:
"""
schema: {"field_name": {"type": "string", "constraints": {...}}, ...}
"""
schema_str = json.dumps(schema, indent=2)
record_str = json.dumps(record, indent=2)
messages = [{
"role": "user",
"content": f"""Validate this data record against the schema.
Schema:
{schema_str}
Record:
{record_str}
Use validate_field for each field, then report_validation_result with a summary.
For errors, suggest what the data team should fix."""
}]
result = ValidationResult(is_valid=True, errors=[], warnings=[])
while True:
response = client.messages.create(
model="claude-haiku-4-5", # Fast/cheap for validation
max_tokens=1024,
tools=VALIDATION_TOOLS,
messages=messages
)
if response.stop_reason == "end_turn":
break
tool_calls = [b for b in response.content if b.type == "tool_use"]
if not tool_calls:
break
tool_results = []
for call in tool_calls:
tool_output = execute_validation_tool(call.name, call.input, record)
# Capture the final report
if call.name == "report_validation_result":
result.is_valid = call.input.get("is_valid", True)
result.errors = call.input.get("errors", [])
result.warnings = call.input.get("warnings", [])
tool_results.append({
"type": "tool_result",
"tool_use_id": call.id,
"content": tool_output
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return result
Agent 2: ETL Orchestrator
Routes and transforms records based on content — handles schema variations from multiple source systems.
ETL_TOOLS = [
{
"name": "classify_record",
"description": "Classify a record by source system and data type",
"input_schema": {
"type": "object",
"properties": {
"source_system": {"type": "string"},
"record_type": {"type": "string"},
"confidence": {"type": "number"},
"notes": {"type": "string"}
},
"required": ["source_system", "record_type", "confidence"]
}
},
{
"name": "transform_record",
"description": "Apply transformation to normalize the record to the target schema",
"input_schema": {
"type": "object",
"properties": {
"transformed": {
"type": "object",
"description": "The normalized record in target schema format"
},
"dropped_fields": {
"type": "array",
"items": {"type": "string"},
"description": "Fields present in source but not in target schema"
}
},
"required": ["transformed"]
}
},
{
"name": "flag_for_review",
"description": "Flag a record for human review when automatic transformation is uncertain",
"input_schema": {
"type": "object",
"properties": {
"reason": {"type": "string"},
"suggested_action": {"type": "string"}
},
"required": ["reason"]
}
}
]
TARGET_SCHEMA = """
Target schema (all fields):
- id: string (required)
- customer_id: string (required)
- amount: float (required, USD)
- currency: string (ISO 4217)
- created_at: ISO 8601 datetime
- status: enum[pending, completed, failed, refunded]
- metadata: object (optional, any shape)
"""
def process_record(raw_record: dict) -> dict:
"""Returns transformed record or {'flagged': True, 'reason': ...}"""
messages = [{
"role": "user",
"content": f"""Process this raw data record into our target schema.
{TARGET_SCHEMA}
Raw record:
{json.dumps(raw_record, indent=2)}
Steps:
1. classify_record — identify source system and record type
2. transform_record — normalize to target schema (map field names, convert types, handle missing fields)
3. If you can't confidently transform a field, use flag_for_review instead
Common issues to handle:
- Unix timestamps → ISO 8601
- Cents (integer) → dollars (float)
- Various status strings → our enum values
- Missing customer_id → flag for review"""
}]
result = {}
flagged = False
flag_reason = ""
while True:
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=ETL_TOOLS,
messages=messages
)
if response.stop_reason == "end_turn":
break
tool_calls = [b for b in response.content if b.type == "tool_use"]
if not tool_calls:
break
tool_results = []
for call in tool_calls:
if call.name == "transform_record":
result = call.input.get("transformed", {})
elif call.name == "flag_for_review":
flagged = True
flag_reason = call.input.get("reason", "")
tool_results.append({
"type": "tool_result",
"tool_use_id": call.id,
"content": json.dumps({"ok": True})
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
if flagged:
return {"flagged": True, "reason": flag_reason, "original": raw_record}
return result
Agent 3: Data Quality Agent
Generates and runs its own quality checks against a dataset — useful for profiling new data sources.
import pandas as pd
QUALITY_TOOLS = [
{
"name": "run_check",
"description": "Run a data quality check on the dataset",
"input_schema": {
"type": "object",
"properties": {
"check_name": {"type": "string"},
"check_type": {
"type": "string",
"enum": ["null_rate", "unique_rate", "value_distribution",
"range_check", "pattern_match", "referential_integrity"]
},
"column": {"type": "string"},
"parameters": {"type": "object"}
},
"required": ["check_name", "check_type", "column"]
}
},
{
"name": "report_quality_summary",
"description": "Report overall data quality findings",
"input_schema": {
"type": "object",
"properties": {
"overall_score": {"type": "number", "description": "0-100"},
"critical_issues": {"type": "array", "items": {"type": "string"}},
"recommendations": {"type": "array", "items": {"type": "string"}}
},
"required": ["overall_score", "critical_issues", "recommendations"]
}
}
]
def run_quality_check(check_name: str, check_type: str, column: str,
parameters: dict, df: pd.DataFrame) -> dict:
if column not in df.columns:
return {"error": f"Column '{column}' not found"}
col = df[column]
if check_type == "null_rate":
null_pct = col.isnull().sum() / len(df) * 100
return {"check": check_name, "null_rate_pct": round(null_pct, 2),
"null_count": int(col.isnull().sum())}
elif check_type == "unique_rate":
unique_pct = col.nunique() / len(df) * 100
return {"check": check_name, "unique_rate_pct": round(unique_pct, 2),
"unique_count": int(col.nunique())}
elif check_type == "value_distribution":
top_values = col.value_counts().head(10).to_dict()
return {"check": check_name, "top_values": {str(k): int(v) for k, v in top_values.items()}}
elif check_type == "range_check":
if col.dtype in ["int64", "float64"]:
return {
"check": check_name,
"min": float(col.min()),
"max": float(col.max()),
"mean": float(col.mean()),
"below_min": int((col < parameters.get("min", float("-inf"))).sum()),
"above_max": int((col > parameters.get("max", float("inf"))).sum())
}
return {"error": "range_check requires numeric column"}
return {"check": check_name, "result": "check type not implemented"}
def run_data_quality_agent(df: pd.DataFrame, context: str = "") -> dict:
"""
context: describe what this data represents (e.g., "daily sales transactions from Stripe")
"""
columns_info = {col: str(df[col].dtype) for col in df.columns}
sample = df.head(3).to_dict(orient="records")
messages = [{
"role": "user",
"content": f"""Run a data quality assessment on this dataset.
Context: {context or 'Unknown dataset'}
Shape: {df.shape[0]} rows x {df.shape[1]} columns
Columns and types: {json.dumps(columns_info)}
Sample (3 rows): {json.dumps(sample, default=str)}
Design and run quality checks:
1. Check null rates on all columns
2. Check uniqueness on ID-like columns
3. Check value distributions on categorical columns
4. Check numeric ranges where applicable
After running checks, use report_quality_summary with an overall score (0-100),
critical issues, and actionable recommendations."""
}]
quality_summary = {}
while True:
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
tools=QUALITY_TOOLS,
messages=messages
)
if response.stop_reason == "end_turn":
break
tool_calls = [b for b in response.content if b.type == "tool_use"]
if not tool_calls:
break
tool_results = []
for call in tool_calls:
if call.name == "run_check":
result = run_quality_check(
call.input["check_name"],
call.input["check_type"],
call.input["column"],
call.input.get("parameters", {}),
df
)
elif call.name == "report_quality_summary":
quality_summary = call.input
result = {"reported": True}
else:
result = {"error": "unknown tool"}
tool_results.append({
"type": "tool_result",
"tool_use_id": call.id,
"content": json.dumps(result, default=str)
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return quality_summary
Usage Example
# Validate a record
schema = {
"customer_id": {"type": "string", "constraints": {}},
"amount": {"type": "float", "constraints": {"min": 0}},
"status": {"type": "string", "constraints": {"enum": ["pending", "completed", "failed"]}}
}
record = {"customer_id": "cust_123", "amount": -50.0, "status": "unknown"}
result = validate_record(record, schema)
print(f"Valid: {result.is_valid}")
for err in result.errors:
print(f" {err['field']}: {err['issue']} → {err['action']}")
# ETL transform
raw = {"txn_id": "t_001", "uid": "u_42", "cents": 4999, "ts": 1714262400, "state": "done"}
transformed = process_record(raw)
print(transformed)
# → {"id": "t_001", "customer_id": "u_42", "amount": 49.99,
# "created_at": "2024-04-28T00:00:00Z", "status": "completed"}
# Data quality check
df = pd.read_csv("data/transactions.csv")
quality = run_data_quality_agent(df, "daily transactions from Stripe webhook")
print(f"Quality score: {quality['overall_score']}/100")
for issue in quality['critical_issues']:
print(f" CRITICAL: {issue}")
Frequently Asked Questions
Isn't this expensive at scale? Yes — LLM validation costs $0.003-0.015 per record depending on model and complexity. Use these agents for orchestration and exception handling, not row-level processing. Run validation on a sample first, then apply rules programmatically at scale.
How do I handle 1M+ record datasets? Sample-based quality assessment (run on 10K records, apply findings to all). Use Claude to generate the validation rules, then run those rules in pandas/SQL without LLM involvement.
Can Claude handle streaming data from Kafka/Kinesis? You can call these agents on micro-batches. Don't call the LLM per-message on high-throughput streams — batch 100-1000 records per call.
What's the right model choice?
Validation of simple schemas: claude-haiku-4-5 (fast, cheap). Complex ETL orchestration and quality assessment: claude-sonnet-4-5. Reserve Opus for pipelines where quality analysis quality itself matters.
Related Guides
- Claude Agent SDK: Build Automation Agents — SDK fundamentals
- Claude Code for Data Science: Jupyter Workflows — Notebook integration
- Streaming vs Batch: Claude Agent SDK Patterns — Throughput optimization
Go Deeper
Agent SDK Cookbook — $49 — Full data pipeline implementation: Airflow operator for Claude agents, Kafka consumer pattern, cost tracking per pipeline stage, and schema drift alerting system.
→ Get the Agent SDK Cookbook — $49
30-day money-back guarantee. Instant download.