← All guides

Claude Agent SDK for Data Pipelines: ETL, Validation, and Transformation Agents

How to use the Claude Agent SDK to build intelligent data pipeline agents — ETL orchestration with tool use, schema validation agents, data quality.

Claude Agent SDK for Data Pipelines: ETL, Validation, and Transformation Agents

The Claude Agent SDK fits data pipelines when the logic is too variable for rigid rules: schema drift, inconsistent source formats, validation that requires judgment, and transformation logic that adapts to data shape. This guide builds three pipeline agents: a schema validation agent that explains failures in plain English, an ETL orchestrator that routes records based on content, and a data quality agent that generates and runs its own checks.


When Claude Agents Make Sense in Data Pipelines

Use an agent when:

Don't use an agent when:

The sweet spot: batch validation and orchestration, not row-level transformation.


Setup

import anthropic
import json
from typing import Any
from dataclasses import dataclass

client = anthropic.Anthropic()

Agent 1: Schema Validation Agent

Validates incoming data against an expected schema, returns structured failures with plain-English explanations.

VALIDATION_TOOLS = [
    {
        "name": "validate_field",
        "description": "Validate a single field value against its schema definition",
        "input_schema": {
            "type": "object",
            "properties": {
                "field_name": {"type": "string"},
                "value": {},
                "expected_type": {"type": "string"},
                "constraints": {
                    "type": "object",
                    "description": "e.g., {min: 0, max: 100} or {enum: ['A', 'B']} or {pattern: '...'}"
                }
            },
            "required": ["field_name", "value", "expected_type"]
        }
    },
    {
        "name": "report_validation_result",
        "description": "Report the final validation result for the record",
        "input_schema": {
            "type": "object",
            "properties": {
                "is_valid": {"type": "boolean"},
                "errors": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "field": {"type": "string"},
                            "issue": {"type": "string"},
                            "action": {"type": "string", "description": "Recommended fix"}
                        }
                    }
                },
                "warnings": {
                    "type": "array",
                    "items": {"type": "string"}
                }
            },
            "required": ["is_valid", "errors"]
        }
    }
]


def execute_validation_tool(tool_name: str, tool_input: dict, record: dict) -> str:
    if tool_name == "validate_field":
        field = tool_input["field_name"]
        value = record.get(field)
        expected_type = tool_input["expected_type"]
        constraints = tool_input.get("constraints", {})

        issues = []

        # Type check
        type_map = {"string": str, "integer": int, "float": float, "boolean": bool}
        if expected_type in type_map and value is not None:
            if not isinstance(value, type_map[expected_type]):
                issues.append(f"Expected {expected_type}, got {type(value).__name__}")

        # Constraint checks
        if "min" in constraints and isinstance(value, (int, float)):
            if value < constraints["min"]:
                issues.append(f"Value {value} is below minimum {constraints['min']}")

        if "max" in constraints and isinstance(value, (int, float)):
            if value > constraints["max"]:
                issues.append(f"Value {value} exceeds maximum {constraints['max']}")

        if "enum" in constraints and value not in constraints["enum"]:
            issues.append(f"Value '{value}' not in allowed values: {constraints['enum']}")

        if value is None and not constraints.get("nullable", False):
            issues.append("Required field is null/missing")

        return json.dumps({"field": field, "value": value, "issues": issues})

    elif tool_name == "report_validation_result":
        return json.dumps({"reported": True})

    return json.dumps({"error": f"Unknown tool: {tool_name}"})


@dataclass
class ValidationResult:
    is_valid: bool
    errors: list[dict]
    warnings: list[str]


def validate_record(record: dict, schema: dict) -> ValidationResult:
    """
    schema: {"field_name": {"type": "string", "constraints": {...}}, ...}
    """
    schema_str = json.dumps(schema, indent=2)
    record_str = json.dumps(record, indent=2)

    messages = [{
        "role": "user",
        "content": f"""Validate this data record against the schema.

Schema:
{schema_str}

Record:
{record_str}

Use validate_field for each field, then report_validation_result with a summary.
For errors, suggest what the data team should fix."""
    }]

    result = ValidationResult(is_valid=True, errors=[], warnings=[])

    while True:
        response = client.messages.create(
            model="claude-haiku-4-5",  # Fast/cheap for validation
            max_tokens=1024,
            tools=VALIDATION_TOOLS,
            messages=messages
        )

        if response.stop_reason == "end_turn":
            break

        tool_calls = [b for b in response.content if b.type == "tool_use"]
        if not tool_calls:
            break

        tool_results = []
        for call in tool_calls:
            tool_output = execute_validation_tool(call.name, call.input, record)

            # Capture the final report
            if call.name == "report_validation_result":
                result.is_valid = call.input.get("is_valid", True)
                result.errors = call.input.get("errors", [])
                result.warnings = call.input.get("warnings", [])

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": call.id,
                "content": tool_output
            })

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})

    return result

Agent 2: ETL Orchestrator

Routes and transforms records based on content — handles schema variations from multiple source systems.

ETL_TOOLS = [
    {
        "name": "classify_record",
        "description": "Classify a record by source system and data type",
        "input_schema": {
            "type": "object",
            "properties": {
                "source_system": {"type": "string"},
                "record_type": {"type": "string"},
                "confidence": {"type": "number"},
                "notes": {"type": "string"}
            },
            "required": ["source_system", "record_type", "confidence"]
        }
    },
    {
        "name": "transform_record",
        "description": "Apply transformation to normalize the record to the target schema",
        "input_schema": {
            "type": "object",
            "properties": {
                "transformed": {
                    "type": "object",
                    "description": "The normalized record in target schema format"
                },
                "dropped_fields": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Fields present in source but not in target schema"
                }
            },
            "required": ["transformed"]
        }
    },
    {
        "name": "flag_for_review",
        "description": "Flag a record for human review when automatic transformation is uncertain",
        "input_schema": {
            "type": "object",
            "properties": {
                "reason": {"type": "string"},
                "suggested_action": {"type": "string"}
            },
            "required": ["reason"]
        }
    }
]


TARGET_SCHEMA = """
Target schema (all fields):
- id: string (required)
- customer_id: string (required)  
- amount: float (required, USD)
- currency: string (ISO 4217)
- created_at: ISO 8601 datetime
- status: enum[pending, completed, failed, refunded]
- metadata: object (optional, any shape)
"""


def process_record(raw_record: dict) -> dict:
    """Returns transformed record or {'flagged': True, 'reason': ...}"""

    messages = [{
        "role": "user",
        "content": f"""Process this raw data record into our target schema.

{TARGET_SCHEMA}

Raw record:
{json.dumps(raw_record, indent=2)}

Steps:
1. classify_record — identify source system and record type
2. transform_record — normalize to target schema (map field names, convert types, handle missing fields)
3. If you can't confidently transform a field, use flag_for_review instead

Common issues to handle:
- Unix timestamps → ISO 8601
- Cents (integer) → dollars (float)  
- Various status strings → our enum values
- Missing customer_id → flag for review"""
    }]

    result = {}
    flagged = False
    flag_reason = ""

    while True:
        response = client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=1024,
            tools=ETL_TOOLS,
            messages=messages
        )

        if response.stop_reason == "end_turn":
            break

        tool_calls = [b for b in response.content if b.type == "tool_use"]
        if not tool_calls:
            break

        tool_results = []
        for call in tool_calls:
            if call.name == "transform_record":
                result = call.input.get("transformed", {})
            elif call.name == "flag_for_review":
                flagged = True
                flag_reason = call.input.get("reason", "")

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": call.id,
                "content": json.dumps({"ok": True})
            })

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})

    if flagged:
        return {"flagged": True, "reason": flag_reason, "original": raw_record}

    return result

Agent 3: Data Quality Agent

Generates and runs its own quality checks against a dataset — useful for profiling new data sources.

import pandas as pd

QUALITY_TOOLS = [
    {
        "name": "run_check",
        "description": "Run a data quality check on the dataset",
        "input_schema": {
            "type": "object",
            "properties": {
                "check_name": {"type": "string"},
                "check_type": {
                    "type": "string",
                    "enum": ["null_rate", "unique_rate", "value_distribution",
                             "range_check", "pattern_match", "referential_integrity"]
                },
                "column": {"type": "string"},
                "parameters": {"type": "object"}
            },
            "required": ["check_name", "check_type", "column"]
        }
    },
    {
        "name": "report_quality_summary",
        "description": "Report overall data quality findings",
        "input_schema": {
            "type": "object",
            "properties": {
                "overall_score": {"type": "number", "description": "0-100"},
                "critical_issues": {"type": "array", "items": {"type": "string"}},
                "recommendations": {"type": "array", "items": {"type": "string"}}
            },
            "required": ["overall_score", "critical_issues", "recommendations"]
        }
    }
]


def run_quality_check(check_name: str, check_type: str, column: str,
                      parameters: dict, df: pd.DataFrame) -> dict:
    if column not in df.columns:
        return {"error": f"Column '{column}' not found"}

    col = df[column]

    if check_type == "null_rate":
        null_pct = col.isnull().sum() / len(df) * 100
        return {"check": check_name, "null_rate_pct": round(null_pct, 2),
                "null_count": int(col.isnull().sum())}

    elif check_type == "unique_rate":
        unique_pct = col.nunique() / len(df) * 100
        return {"check": check_name, "unique_rate_pct": round(unique_pct, 2),
                "unique_count": int(col.nunique())}

    elif check_type == "value_distribution":
        top_values = col.value_counts().head(10).to_dict()
        return {"check": check_name, "top_values": {str(k): int(v) for k, v in top_values.items()}}

    elif check_type == "range_check":
        if col.dtype in ["int64", "float64"]:
            return {
                "check": check_name,
                "min": float(col.min()),
                "max": float(col.max()),
                "mean": float(col.mean()),
                "below_min": int((col < parameters.get("min", float("-inf"))).sum()),
                "above_max": int((col > parameters.get("max", float("inf"))).sum())
            }
        return {"error": "range_check requires numeric column"}

    return {"check": check_name, "result": "check type not implemented"}


def run_data_quality_agent(df: pd.DataFrame, context: str = "") -> dict:
    """
    context: describe what this data represents (e.g., "daily sales transactions from Stripe")
    """
    columns_info = {col: str(df[col].dtype) for col in df.columns}
    sample = df.head(3).to_dict(orient="records")

    messages = [{
        "role": "user",
        "content": f"""Run a data quality assessment on this dataset.

Context: {context or 'Unknown dataset'}
Shape: {df.shape[0]} rows x {df.shape[1]} columns
Columns and types: {json.dumps(columns_info)}
Sample (3 rows): {json.dumps(sample, default=str)}

Design and run quality checks:
1. Check null rates on all columns
2. Check uniqueness on ID-like columns
3. Check value distributions on categorical columns
4. Check numeric ranges where applicable

After running checks, use report_quality_summary with an overall score (0-100),
critical issues, and actionable recommendations."""
    }]

    quality_summary = {}

    while True:
        response = client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            tools=QUALITY_TOOLS,
            messages=messages
        )

        if response.stop_reason == "end_turn":
            break

        tool_calls = [b for b in response.content if b.type == "tool_use"]
        if not tool_calls:
            break

        tool_results = []
        for call in tool_calls:
            if call.name == "run_check":
                result = run_quality_check(
                    call.input["check_name"],
                    call.input["check_type"],
                    call.input["column"],
                    call.input.get("parameters", {}),
                    df
                )
            elif call.name == "report_quality_summary":
                quality_summary = call.input
                result = {"reported": True}
            else:
                result = {"error": "unknown tool"}

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": call.id,
                "content": json.dumps(result, default=str)
            })

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})

    return quality_summary

Usage Example

# Validate a record
schema = {
    "customer_id": {"type": "string", "constraints": {}},
    "amount": {"type": "float", "constraints": {"min": 0}},
    "status": {"type": "string", "constraints": {"enum": ["pending", "completed", "failed"]}}
}

record = {"customer_id": "cust_123", "amount": -50.0, "status": "unknown"}
result = validate_record(record, schema)
print(f"Valid: {result.is_valid}")
for err in result.errors:
    print(f"  {err['field']}: {err['issue']} → {err['action']}")

# ETL transform
raw = {"txn_id": "t_001", "uid": "u_42", "cents": 4999, "ts": 1714262400, "state": "done"}
transformed = process_record(raw)
print(transformed)
# → {"id": "t_001", "customer_id": "u_42", "amount": 49.99, 
#    "created_at": "2024-04-28T00:00:00Z", "status": "completed"}

# Data quality check
df = pd.read_csv("data/transactions.csv")
quality = run_data_quality_agent(df, "daily transactions from Stripe webhook")
print(f"Quality score: {quality['overall_score']}/100")
for issue in quality['critical_issues']:
    print(f"  CRITICAL: {issue}")

Frequently Asked Questions

Isn't this expensive at scale? Yes — LLM validation costs $0.003-0.015 per record depending on model and complexity. Use these agents for orchestration and exception handling, not row-level processing. Run validation on a sample first, then apply rules programmatically at scale.

How do I handle 1M+ record datasets? Sample-based quality assessment (run on 10K records, apply findings to all). Use Claude to generate the validation rules, then run those rules in pandas/SQL without LLM involvement.

Can Claude handle streaming data from Kafka/Kinesis? You can call these agents on micro-batches. Don't call the LLM per-message on high-throughput streams — batch 100-1000 records per call.

What's the right model choice? Validation of simple schemas: claude-haiku-4-5 (fast, cheap). Complex ETL orchestration and quality assessment: claude-sonnet-4-5. Reserve Opus for pipelines where quality analysis quality itself matters.


Related Guides


Go Deeper

Agent SDK Cookbook — $49 — Full data pipeline implementation: Airflow operator for Claude agents, Kafka consumer pattern, cost tracking per pipeline stage, and schema drift alerting system.

→ Get the Agent SDK Cookbook — $49

30-day money-back guarantee. Instant download.

AI Disclosure: Written with Claude Code.

Tools and references