← All guides

Claude Agents for DevOps: Monitoring, Alerting, and Automated Remediation

How to build Claude-powered DevOps agents that analyze monitoring alerts, explain incidents in plain English, generate runbooks on-the-fly, and execute.

Claude Agents for DevOps: Monitoring, Alerting, and Automated Remediation

A Claude DevOps agent bridges the gap between raw monitoring alerts and actionable response — it reads metrics, interprets what's happening, generates a plain-English explanation, and proposes (or executes) remediation steps. The key architectural constraint: the agent always stops before destructive actions and requests approval. This guide builds an incident analysis agent, an alert triage agent, and a safe remediation agent with explicit approval gates.


What Claude Adds to DevOps Tooling

Existing monitoring tools (Datadog, Grafana, PagerDuty) are good at detecting anomalies. They're poor at:

Claude agents fill these gaps without replacing your monitoring stack.


Architecture

Alert fires → Agent reads metrics/logs → Analysis → 
Explanation (Slack/PagerDuty) → Proposed remediation → 
Approval gate → Safe execution → Post-incident summary

Destructive actions (restarts, rollbacks, scale-downs) always have an approval gate. Read-only actions (metric queries, log tails, config reads) are automatic.


Setup

import anthropic
import json
import subprocess
from typing import Optional

client = anthropic.Anthropic()

Tool Definitions

DEVOPS_TOOLS = [
    {
        "name": "query_metrics",
        "description": "Query time-series metrics from monitoring system",
        "input_schema": {
            "type": "object",
            "properties": {
                "metric": {"type": "string", "description": "e.g., 'cpu_usage', 'memory_usage', 'http_error_rate'"},
                "service": {"type": "string"},
                "time_range": {"type": "string", "description": "e.g., '30m', '1h', '24h'"},
                "aggregation": {"type": "string", "enum": ["avg", "max", "min", "p95", "p99"]}
            },
            "required": ["metric", "service", "time_range"]
        }
    },
    {
        "name": "tail_logs",
        "description": "Get recent log lines for a service",
        "input_schema": {
            "type": "object",
            "properties": {
                "service": {"type": "string"},
                "lines": {"type": "integer", "default": 50},
                "filter": {"type": "string", "description": "grep-style filter pattern (optional)"}
            },
            "required": ["service"]
        }
    },
    {
        "name": "get_deployment_history",
        "description": "Get recent deployments for a service",
        "input_schema": {
            "type": "object",
            "properties": {
                "service": {"type": "string"},
                "limit": {"type": "integer", "default": 5}
            },
            "required": ["service"]
        }
    },
    {
        "name": "propose_remediation",
        "description": "Propose remediation steps. REQUIRES human approval before execution.",
        "input_schema": {
            "type": "object",
            "properties": {
                "diagnosis": {"type": "string", "description": "What's wrong and why"},
                "severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
                "steps": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "action": {"type": "string"},
                            "command": {"type": "string", "description": "Actual command to run (if applicable)"},
                            "risk": {"type": "string", "enum": ["safe", "moderate", "destructive"]},
                            "reversible": {"type": "boolean"}
                        }
                    }
                }
            },
            "required": ["diagnosis", "severity", "steps"]
        }
    }
]

Tool Execution

# Simulate metric/log queries — replace with real Datadog/Prometheus/CloudWatch calls
def execute_devops_tool(tool_name: str, tool_input: dict) -> str:
    if tool_name == "query_metrics":
        # In production: call Datadog API, Prometheus query_range, etc.
        metric = tool_input["metric"]
        service = tool_input["service"]
        time_range = tool_input["time_range"]

        # Simulated response — replace with real API call
        simulated_data = {
            "metric": metric,
            "service": service,
            "time_range": time_range,
            "current_value": 87.3 if "cpu" in metric else 145.2,
            "baseline_avg": 42.1 if "cpu" in metric else 98.0,
            "alert_threshold": 80 if "cpu" in metric else 200,
            "trend": "increasing",
            "spike_started": "14:32 UTC"
        }
        return json.dumps(simulated_data)

    elif tool_name == "tail_logs":
        service = tool_input["service"]
        # In production: kubectl logs, CloudWatch Logs, Loki query, etc.
        simulated_logs = f"""[14:30:12] INFO Deploying {service} v2.4.1
[14:31:45] INFO Deploy complete
[14:32:01] WARN High memory usage detected: 87% 
[14:32:15] ERROR OOMKilled: container exceeded memory limit
[14:32:16] INFO Container restarting (attempt 1/3)
[14:32:45] ERROR Connection pool exhausted: max=50 current=50
[14:33:01] WARN Slow query detected: 4200ms (threshold: 2000ms)"""
        return json.dumps({"service": service, "logs": simulated_logs})

    elif tool_name == "get_deployment_history":
        service = tool_input["service"]
        return json.dumps({
            "service": service,
            "deployments": [
                {"version": "v2.4.1", "deployed_at": "14:30 UTC", "deployed_by": "ci/cd"},
                {"version": "v2.4.0", "deployed_at": "09:15 UTC", "deployed_by": "alex"},
                {"version": "v2.3.9", "deployed_at": "2026-04-27 16:00 UTC", "deployed_by": "ci/cd"}
            ]
        })

    elif tool_name == "propose_remediation":
        # Just capture — don't execute. Return immediately.
        return json.dumps({"proposal_received": True})

    return json.dumps({"error": f"Unknown tool: {tool_name}"})

Incident Analysis Agent

def analyze_incident(alert: dict) -> dict:
    """
    alert: {
        "service": str,
        "alert_name": str, 
        "triggered_at": str,
        "details": str
    }
    """
    messages = [{
        "role": "user",
        "content": f"""Analyze this production alert:

Service: {alert['service']}
Alert: {alert['alert_name']}
Triggered at: {alert['triggered_at']}
Details: {alert['details']}

Steps:
1. Query the relevant metrics to understand what's happening
2. Check recent logs for errors
3. Check deployment history (did a recent deploy cause this?)
4. Correlate the signals into a root cause hypothesis
5. Propose remediation steps with risk levels

Be specific about what you observe, not what you assume."""
    }]

    remediation_proposal = None

    while True:
        response = client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            tools=DEVOPS_TOOLS,
            messages=messages
        )

        if response.stop_reason == "end_turn":
            # Extract the final analysis from text
            for block in response.content:
                if hasattr(block, "text"):
                    analysis_text = block.text
            break

        tool_calls = [b for b in response.content if b.type == "tool_use"]
        if not tool_calls:
            break

        tool_results = []
        for call in tool_calls:
            result = execute_devops_tool(call.name, call.input)

            if call.name == "propose_remediation":
                remediation_proposal = call.input

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": call.id,
                "content": result
            })

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})

    return {
        "alert": alert,
        "analysis": analysis_text if 'analysis_text' in dir() else "",
        "remediation": remediation_proposal,
        "requires_approval": remediation_proposal is not None
    }

Approval Gate and Execution

def execute_remediation_with_approval(proposal: dict, auto_approve_safe: bool = False) -> dict:
    """
    Execute remediation steps, with approval gate for destructive actions.
    auto_approve_safe: automatically run 'safe' steps without human confirmation
    """
    steps = proposal.get("steps", [])
    results = []

    for step in steps:
        action = step["action"]
        command = step.get("command", "")
        risk = step.get("risk", "safe")
        reversible = step.get("reversible", True)

        # Auto-approve safe actions
        if risk == "safe" and auto_approve_safe:
            approved = True
            print(f"[AUTO] Executing safe action: {action}")
        else:
            # Human approval required
            print(f"\n{'='*50}")
            print(f"APPROVAL REQUIRED")
            print(f"Action: {action}")
            if command:
                print(f"Command: {command}")
            print(f"Risk: {risk} | Reversible: {reversible}")
            answer = input("Approve? (yes/no): ").strip().lower()
            approved = answer == "yes"

        if approved:
            if command:
                try:
                    # SAFETY: only run pre-vetted commands
                    # In production, use an allowlist of safe commands
                    safe_prefixes = ["kubectl get", "kubectl describe", "docker ps",
                                     "kubectl rollout", "fly scale", "fly deploy --image"]
                    is_safe_command = any(command.startswith(p) for p in safe_prefixes)

                    if is_safe_command:
                        result = subprocess.run(
                            command.split(), capture_output=True, text=True, timeout=30
                        )
                        output = result.stdout or result.stderr
                    else:
                        output = f"[BLOCKED] Command not in allowlist: {command}"

                    results.append({"action": action, "status": "executed", "output": output})
                except subprocess.TimeoutExpired:
                    results.append({"action": action, "status": "timeout"})
            else:
                results.append({"action": action, "status": "acknowledged"})
        else:
            results.append({"action": action, "status": "skipped_by_operator"})

    return {"steps_executed": results}

Slack/PagerDuty Notification

NOTIFICATION_TEMPLATE = """
🚨 *Incident Alert — {service}*

*Alert:* {alert_name}
*Triggered:* {triggered_at}
*Severity:* {severity}

*Diagnosis:*
{diagnosis}

*Proposed Remediation:*
{steps_summary}

*Action required:* Operator approval needed for {destructive_count} step(s).
"""


def format_notification(incident_result: dict) -> str:
    alert = incident_result["alert"]
    proposal = incident_result.get("remediation", {})

    if not proposal:
        return f"Alert: {alert['alert_name']} on {alert['service']} — no remediation proposed."

    steps = proposal.get("steps", [])
    steps_summary = "\n".join([
        f"  {i+1}. [{s['risk'].upper()}] {s['action']}"
        for i, s in enumerate(steps)
    ])
    destructive_count = sum(1 for s in steps if s["risk"] in ["moderate", "destructive"])

    return NOTIFICATION_TEMPLATE.format(
        service=alert["service"],
        alert_name=alert["alert_name"],
        triggered_at=alert["triggered_at"],
        severity=proposal.get("severity", "unknown"),
        diagnosis=proposal.get("diagnosis", "Analysis in progress"),
        steps_summary=steps_summary,
        destructive_count=destructive_count
    )

Usage

alert = {
    "service": "api-service",
    "alert_name": "HighMemoryUsage",
    "triggered_at": "2026-04-28 14:33 UTC",
    "details": "Memory usage at 87%, up from 42% baseline. Started ~14:32."
}

result = analyze_incident(alert)

# Send to Slack
notification = format_notification(result)
print(notification)

# Execute with approval
if result["requires_approval"] and result["remediation"]:
    execute_results = execute_remediation_with_approval(
        result["remediation"],
        auto_approve_safe=True  # Auto-run safe read operations
    )
    print(execute_results)

Frequently Asked Questions

How do I connect to real monitoring systems? Replace the simulated execute_devops_tool responses with real API calls: Datadog's /api/v1/query, Prometheus's /api/v1/query_range, AWS CloudWatch's GetMetricStatistics, or Loki's log query API.

Is it safe to let Claude suggest kubectl rollout restart? The proposal-then-approval pattern keeps humans in the loop for any action with side effects. The agent never executes commands directly — it proposes, you approve. The command allowlist in execute_remediation_with_approval adds a second safety layer.

Can this replace PagerDuty? No — Claude analyzes alerts from PagerDuty, it doesn't replace the alerting system. The agent is the tier between "alert fires" and "human looks at dashboard."

What about multi-service incidents? Run analyze_incident for each affected service in parallel (asyncio), then synthesize the results in a second call to identify the common root cause.


Related Guides


Go Deeper

Agent SDK Cookbook — $49 — Full DevOps agent implementation: PagerDuty webhook receiver, Slack approval bot with interactive buttons, post-incident report generator, and runbook auto-updater.

→ Get the Agent SDK Cookbook — $49

30-day money-back guarantee. Instant download.

AI Disclosure: Written with Claude Code.

Tools and references