Claude Agents for DevOps: Monitoring, Alerting, and Automated Remediation
A Claude DevOps agent bridges the gap between raw monitoring alerts and actionable response — it reads metrics, interprets what's happening, generates a plain-English explanation, and proposes (or executes) remediation steps. The key architectural constraint: the agent always stops before destructive actions and requests approval. This guide builds an incident analysis agent, an alert triage agent, and a safe remediation agent with explicit approval gates.
What Claude Adds to DevOps Tooling
Existing monitoring tools (Datadog, Grafana, PagerDuty) are good at detecting anomalies. They're poor at:
- Explaining what's happening in context ("this memory spike correlates with the 14:30 deploy")
- Correlating across signals (CPU + latency + error rate → single root cause hypothesis)
- Generating runbooks for novel incidents that aren't in the playbook
- Communicating to stakeholders who don't read dashboards
Claude agents fill these gaps without replacing your monitoring stack.
Architecture
Alert fires → Agent reads metrics/logs → Analysis →
Explanation (Slack/PagerDuty) → Proposed remediation →
Approval gate → Safe execution → Post-incident summary
Destructive actions (restarts, rollbacks, scale-downs) always have an approval gate. Read-only actions (metric queries, log tails, config reads) are automatic.
Setup
import anthropic
import json
import subprocess
from typing import Optional
client = anthropic.Anthropic()
Tool Definitions
DEVOPS_TOOLS = [
{
"name": "query_metrics",
"description": "Query time-series metrics from monitoring system",
"input_schema": {
"type": "object",
"properties": {
"metric": {"type": "string", "description": "e.g., 'cpu_usage', 'memory_usage', 'http_error_rate'"},
"service": {"type": "string"},
"time_range": {"type": "string", "description": "e.g., '30m', '1h', '24h'"},
"aggregation": {"type": "string", "enum": ["avg", "max", "min", "p95", "p99"]}
},
"required": ["metric", "service", "time_range"]
}
},
{
"name": "tail_logs",
"description": "Get recent log lines for a service",
"input_schema": {
"type": "object",
"properties": {
"service": {"type": "string"},
"lines": {"type": "integer", "default": 50},
"filter": {"type": "string", "description": "grep-style filter pattern (optional)"}
},
"required": ["service"]
}
},
{
"name": "get_deployment_history",
"description": "Get recent deployments for a service",
"input_schema": {
"type": "object",
"properties": {
"service": {"type": "string"},
"limit": {"type": "integer", "default": 5}
},
"required": ["service"]
}
},
{
"name": "propose_remediation",
"description": "Propose remediation steps. REQUIRES human approval before execution.",
"input_schema": {
"type": "object",
"properties": {
"diagnosis": {"type": "string", "description": "What's wrong and why"},
"severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"action": {"type": "string"},
"command": {"type": "string", "description": "Actual command to run (if applicable)"},
"risk": {"type": "string", "enum": ["safe", "moderate", "destructive"]},
"reversible": {"type": "boolean"}
}
}
}
},
"required": ["diagnosis", "severity", "steps"]
}
}
]
Tool Execution
# Simulate metric/log queries — replace with real Datadog/Prometheus/CloudWatch calls
def execute_devops_tool(tool_name: str, tool_input: dict) -> str:
if tool_name == "query_metrics":
# In production: call Datadog API, Prometheus query_range, etc.
metric = tool_input["metric"]
service = tool_input["service"]
time_range = tool_input["time_range"]
# Simulated response — replace with real API call
simulated_data = {
"metric": metric,
"service": service,
"time_range": time_range,
"current_value": 87.3 if "cpu" in metric else 145.2,
"baseline_avg": 42.1 if "cpu" in metric else 98.0,
"alert_threshold": 80 if "cpu" in metric else 200,
"trend": "increasing",
"spike_started": "14:32 UTC"
}
return json.dumps(simulated_data)
elif tool_name == "tail_logs":
service = tool_input["service"]
# In production: kubectl logs, CloudWatch Logs, Loki query, etc.
simulated_logs = f"""[14:30:12] INFO Deploying {service} v2.4.1
[14:31:45] INFO Deploy complete
[14:32:01] WARN High memory usage detected: 87%
[14:32:15] ERROR OOMKilled: container exceeded memory limit
[14:32:16] INFO Container restarting (attempt 1/3)
[14:32:45] ERROR Connection pool exhausted: max=50 current=50
[14:33:01] WARN Slow query detected: 4200ms (threshold: 2000ms)"""
return json.dumps({"service": service, "logs": simulated_logs})
elif tool_name == "get_deployment_history":
service = tool_input["service"]
return json.dumps({
"service": service,
"deployments": [
{"version": "v2.4.1", "deployed_at": "14:30 UTC", "deployed_by": "ci/cd"},
{"version": "v2.4.0", "deployed_at": "09:15 UTC", "deployed_by": "alex"},
{"version": "v2.3.9", "deployed_at": "2026-04-27 16:00 UTC", "deployed_by": "ci/cd"}
]
})
elif tool_name == "propose_remediation":
# Just capture — don't execute. Return immediately.
return json.dumps({"proposal_received": True})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
Incident Analysis Agent
def analyze_incident(alert: dict) -> dict:
"""
alert: {
"service": str,
"alert_name": str,
"triggered_at": str,
"details": str
}
"""
messages = [{
"role": "user",
"content": f"""Analyze this production alert:
Service: {alert['service']}
Alert: {alert['alert_name']}
Triggered at: {alert['triggered_at']}
Details: {alert['details']}
Steps:
1. Query the relevant metrics to understand what's happening
2. Check recent logs for errors
3. Check deployment history (did a recent deploy cause this?)
4. Correlate the signals into a root cause hypothesis
5. Propose remediation steps with risk levels
Be specific about what you observe, not what you assume."""
}]
remediation_proposal = None
while True:
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
tools=DEVOPS_TOOLS,
messages=messages
)
if response.stop_reason == "end_turn":
# Extract the final analysis from text
for block in response.content:
if hasattr(block, "text"):
analysis_text = block.text
break
tool_calls = [b for b in response.content if b.type == "tool_use"]
if not tool_calls:
break
tool_results = []
for call in tool_calls:
result = execute_devops_tool(call.name, call.input)
if call.name == "propose_remediation":
remediation_proposal = call.input
tool_results.append({
"type": "tool_result",
"tool_use_id": call.id,
"content": result
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return {
"alert": alert,
"analysis": analysis_text if 'analysis_text' in dir() else "",
"remediation": remediation_proposal,
"requires_approval": remediation_proposal is not None
}
Approval Gate and Execution
def execute_remediation_with_approval(proposal: dict, auto_approve_safe: bool = False) -> dict:
"""
Execute remediation steps, with approval gate for destructive actions.
auto_approve_safe: automatically run 'safe' steps without human confirmation
"""
steps = proposal.get("steps", [])
results = []
for step in steps:
action = step["action"]
command = step.get("command", "")
risk = step.get("risk", "safe")
reversible = step.get("reversible", True)
# Auto-approve safe actions
if risk == "safe" and auto_approve_safe:
approved = True
print(f"[AUTO] Executing safe action: {action}")
else:
# Human approval required
print(f"\n{'='*50}")
print(f"APPROVAL REQUIRED")
print(f"Action: {action}")
if command:
print(f"Command: {command}")
print(f"Risk: {risk} | Reversible: {reversible}")
answer = input("Approve? (yes/no): ").strip().lower()
approved = answer == "yes"
if approved:
if command:
try:
# SAFETY: only run pre-vetted commands
# In production, use an allowlist of safe commands
safe_prefixes = ["kubectl get", "kubectl describe", "docker ps",
"kubectl rollout", "fly scale", "fly deploy --image"]
is_safe_command = any(command.startswith(p) for p in safe_prefixes)
if is_safe_command:
result = subprocess.run(
command.split(), capture_output=True, text=True, timeout=30
)
output = result.stdout or result.stderr
else:
output = f"[BLOCKED] Command not in allowlist: {command}"
results.append({"action": action, "status": "executed", "output": output})
except subprocess.TimeoutExpired:
results.append({"action": action, "status": "timeout"})
else:
results.append({"action": action, "status": "acknowledged"})
else:
results.append({"action": action, "status": "skipped_by_operator"})
return {"steps_executed": results}
Slack/PagerDuty Notification
NOTIFICATION_TEMPLATE = """
🚨 *Incident Alert — {service}*
*Alert:* {alert_name}
*Triggered:* {triggered_at}
*Severity:* {severity}
*Diagnosis:*
{diagnosis}
*Proposed Remediation:*
{steps_summary}
*Action required:* Operator approval needed for {destructive_count} step(s).
"""
def format_notification(incident_result: dict) -> str:
alert = incident_result["alert"]
proposal = incident_result.get("remediation", {})
if not proposal:
return f"Alert: {alert['alert_name']} on {alert['service']} — no remediation proposed."
steps = proposal.get("steps", [])
steps_summary = "\n".join([
f" {i+1}. [{s['risk'].upper()}] {s['action']}"
for i, s in enumerate(steps)
])
destructive_count = sum(1 for s in steps if s["risk"] in ["moderate", "destructive"])
return NOTIFICATION_TEMPLATE.format(
service=alert["service"],
alert_name=alert["alert_name"],
triggered_at=alert["triggered_at"],
severity=proposal.get("severity", "unknown"),
diagnosis=proposal.get("diagnosis", "Analysis in progress"),
steps_summary=steps_summary,
destructive_count=destructive_count
)
Usage
alert = {
"service": "api-service",
"alert_name": "HighMemoryUsage",
"triggered_at": "2026-04-28 14:33 UTC",
"details": "Memory usage at 87%, up from 42% baseline. Started ~14:32."
}
result = analyze_incident(alert)
# Send to Slack
notification = format_notification(result)
print(notification)
# Execute with approval
if result["requires_approval"] and result["remediation"]:
execute_results = execute_remediation_with_approval(
result["remediation"],
auto_approve_safe=True # Auto-run safe read operations
)
print(execute_results)
Frequently Asked Questions
How do I connect to real monitoring systems?
Replace the simulated execute_devops_tool responses with real API calls: Datadog's /api/v1/query, Prometheus's /api/v1/query_range, AWS CloudWatch's GetMetricStatistics, or Loki's log query API.
Is it safe to let Claude suggest kubectl rollout restart?
The proposal-then-approval pattern keeps humans in the loop for any action with side effects. The agent never executes commands directly — it proposes, you approve. The command allowlist in execute_remediation_with_approval adds a second safety layer.
Can this replace PagerDuty? No — Claude analyzes alerts from PagerDuty, it doesn't replace the alerting system. The agent is the tier between "alert fires" and "human looks at dashboard."
What about multi-service incidents?
Run analyze_incident for each affected service in parallel (asyncio), then synthesize the results in a second call to identify the common root cause.
Related Guides
- Claude Agent SDK: Build Automation Agents — SDK fundamentals
- Building a Content Generation Agent — Another agent pipeline
- How to Test Claude Agents — Testing agent behavior
Go Deeper
Agent SDK Cookbook — $49 — Full DevOps agent implementation: PagerDuty webhook receiver, Slack approval bot with interactive buttons, post-incident report generator, and runbook auto-updater.
→ Get the Agent SDK Cookbook — $49
30-day money-back guarantee. Instant download.