|
|
"""Claude-powered agents used in the deployment readiness workflow.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import asyncio |
|
|
import os |
|
|
from dataclasses import asdict |
|
|
from typing import Dict, List, Optional |
|
|
|
|
|
import anthropic |
|
|
|
|
|
from enhanced_mcp_client import EnhancedMCPClient |
|
|
from schemas import ( |
|
|
ChecklistItem, |
|
|
DocumentationBundle, |
|
|
EvidencePacket, |
|
|
ReadinessPlan, |
|
|
ReadinessRequest, |
|
|
ReviewFinding, |
|
|
ReviewReport, |
|
|
) |
|
|
from sponsor_llms import SponsorLLMClient |
|
|
|
|
|
MODEL_ID = os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-20241022") |
|
|
DEFAULT_MAX_TOKENS = int(os.getenv("CLAUDE_MAX_TOKENS", "1500")) |
|
|
|
|
|
|
|
|
class ClaudeAgent: |
|
|
"""Base helper that wraps Anthropic's Messages API with graceful fallbacks.""" |
|
|
|
|
|
def __init__(self, name: str, system_prompt: str): |
|
|
self.name = name |
|
|
self.system_prompt = system_prompt |
|
|
api_key = os.getenv("ANTHROPIC_API_KEY") |
|
|
self.client: Optional[anthropic.Anthropic] = None |
|
|
if api_key: |
|
|
self.client = anthropic.Anthropic(api_key=api_key) |
|
|
|
|
|
def _call_claude(self, user_prompt: str) -> str: |
|
|
if not self.client: |
|
|
return ( |
|
|
f"[offline-mode] {self.name} would respond to: {user_prompt[:180]}..." |
|
|
) |
|
|
|
|
|
response = self.client.messages.create( |
|
|
model=MODEL_ID, |
|
|
max_tokens=DEFAULT_MAX_TOKENS, |
|
|
temperature=0.2, |
|
|
system=self.system_prompt, |
|
|
messages=[{"role": "user", "content": user_prompt}] |
|
|
) |
|
|
return response.content[0].text.strip() |
|
|
|
|
|
|
|
|
class PlannerAgent(ClaudeAgent): |
|
|
def __init__(self) -> None: |
|
|
super().__init__( |
|
|
name="Planner", |
|
|
system_prompt=( |
|
|
"You are a release engineer. Return JSON with a summary and a list of" |
|
|
" checklist items (title, description, category, owners, status)." |
|
|
" Categories should cover tests, infra, observability, docs, risk mitigation." |
|
|
), |
|
|
) |
|
|
|
|
|
def run(self, request: ReadinessRequest) -> ReadinessPlan: |
|
|
prompt = ( |
|
|
"Build a release readiness plan for the following data:\n" |
|
|
f"Project: {request.project_name}\n" |
|
|
f"Goal: {request.release_goal}\n" |
|
|
f"Code summary: {request.code_summary}\n" |
|
|
f"Infra notes: {request.infra_notes or 'n/a'}\n" |
|
|
f"Stakeholders: {', '.join(request.stakeholders or ['eng'])}" |
|
|
) |
|
|
raw = self._call_claude(prompt) |
|
|
plan_dict = _safe_json(raw, fallback={}) |
|
|
summary = plan_dict.get("summary", raw[:200]) |
|
|
items_payload: List[Dict] = plan_dict.get("items", []) |
|
|
items = [ |
|
|
ChecklistItem( |
|
|
title=item.get("title", "Untitled"), |
|
|
description=item.get("description", ""), |
|
|
category=item.get("category", "general"), |
|
|
owners=item.get("owners", []), |
|
|
status=item.get("status", "todo"), |
|
|
) |
|
|
for item in items_payload |
|
|
] |
|
|
return ReadinessPlan(summary=summary, items=items) |
|
|
|
|
|
|
|
|
class EvidenceAgent(ClaudeAgent): |
|
|
def __init__(self) -> None: |
|
|
super().__init__( |
|
|
name="Evidence", |
|
|
system_prompt=( |
|
|
"You operate like a DevOps SRE. When given a plan, produce three lists:" |
|
|
" findings (signals that support shipping), gaps (missing data), and" |
|
|
" signals (calls you would make to MCP tools or logs). Output JSON." |
|
|
), |
|
|
) |
|
|
self.mcp_client = EnhancedMCPClient() |
|
|
|
|
|
def run(self, plan: ReadinessPlan, project_name: str = "") -> EvidencePacket: |
|
|
|
|
|
mcp_signals = [] |
|
|
try: |
|
|
|
|
|
try: |
|
|
loop = asyncio.get_event_loop() |
|
|
if loop.is_running(): |
|
|
|
|
|
import concurrent.futures |
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
future = executor.submit( |
|
|
asyncio.run, |
|
|
self.mcp_client.gather_deployment_signals( |
|
|
project_name or "project", [item.title for item in plan.items] |
|
|
) |
|
|
) |
|
|
mcp_signals = future.result(timeout=5) |
|
|
else: |
|
|
mcp_signals = loop.run_until_complete( |
|
|
self.mcp_client.gather_deployment_signals( |
|
|
project_name or "project", [item.title for item in plan.items] |
|
|
) |
|
|
) |
|
|
except RuntimeError: |
|
|
|
|
|
mcp_signals = asyncio.run( |
|
|
self.mcp_client.gather_deployment_signals( |
|
|
project_name or "project", [item.title for item in plan.items] |
|
|
) |
|
|
) |
|
|
except Exception as e: |
|
|
mcp_signals = [f"MCP signal gathering: {str(e)[:100]}"] |
|
|
|
|
|
prompt = ( |
|
|
"Given this deployment plan, synthesize evidence:" |
|
|
f"\n{plan.summary}\nItems: {[_safe_truncate(asdict(item)) for item in plan.items]}" |
|
|
f"\n\nMCP Tool Signals: {', '.join(mcp_signals)}" |
|
|
) |
|
|
raw = self._call_claude(prompt) |
|
|
payload = _safe_json(raw, fallback={}) |
|
|
return EvidencePacket( |
|
|
findings=payload.get("findings", [raw[:200]]), |
|
|
gaps=payload.get("gaps", []), |
|
|
signals=mcp_signals + payload.get("signals", []), |
|
|
) |
|
|
|
|
|
|
|
|
class DocumentationAgent(ClaudeAgent): |
|
|
def __init__(self) -> None: |
|
|
super().__init__( |
|
|
name="Documentation", |
|
|
system_prompt=( |
|
|
"You are a technical writer. Create JSON with changelog_entry," |
|
|
" readme_snippet, and announcement_draft. Be concise but specific." |
|
|
), |
|
|
) |
|
|
|
|
|
def run(self, request: ReadinessRequest, evidence: EvidencePacket) -> DocumentationBundle: |
|
|
prompt = ( |
|
|
"Author deployment communications. Project: {project}. Goal: {goal}." |
|
|
" Use this evidence: {evidence}." |
|
|
).format( |
|
|
project=request.project_name, |
|
|
goal=request.release_goal, |
|
|
evidence=evidence.findings, |
|
|
) |
|
|
raw = self._call_claude(prompt) |
|
|
payload = _safe_json(raw, fallback={}) |
|
|
return DocumentationBundle( |
|
|
changelog_entry=payload.get("changelog_entry", raw[:200]), |
|
|
readme_snippet=payload.get("readme_snippet", ""), |
|
|
announcement_draft=payload.get("announcement_draft", ""), |
|
|
) |
|
|
|
|
|
|
|
|
class SynthesisAgent: |
|
|
"""Uses sponsor LLMs (Gemini/OpenAI) to cross-validate evidence.""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
self.sponsor_client = SponsorLLMClient() |
|
|
|
|
|
def run( |
|
|
self, |
|
|
evidence: EvidencePacket, |
|
|
plan_summary: str, |
|
|
preferred_llms: Optional[List[str]] = None, |
|
|
) -> Dict[str, str]: |
|
|
"""Synthesize evidence using sponsor LLMs for bonus points.""" |
|
|
all_evidence = evidence.findings + evidence.signals |
|
|
synthesis = self.sponsor_client.cross_validate_evidence( |
|
|
"\n".join(all_evidence[:5]), |
|
|
plan_summary, |
|
|
preferred_llms, |
|
|
) |
|
|
return synthesis |
|
|
|
|
|
|
|
|
class ReviewerAgent(ClaudeAgent): |
|
|
def __init__(self) -> None: |
|
|
super().__init__( |
|
|
name="Reviewer", |
|
|
system_prompt=( |
|
|
"You chair a release board. Compare plans, evidence, and docs." |
|
|
" Respond with JSON: decision (approve/block/needs_info), confidence" |
|
|
" 0-1, findings (severity+note)." |
|
|
), |
|
|
) |
|
|
|
|
|
def run( |
|
|
self, |
|
|
plan: ReadinessPlan, |
|
|
evidence: EvidencePacket, |
|
|
docs: DocumentationBundle, |
|
|
sponsor_synthesis: Optional[Dict[str, str]] = None, |
|
|
) -> ReviewReport: |
|
|
synthesis_context = "" |
|
|
if sponsor_synthesis: |
|
|
synthesis_context = f"\nSponsor LLM Synthesis: {sponsor_synthesis}" |
|
|
|
|
|
prompt = ( |
|
|
"Review release package. Plan: {plan}. Evidence: {evidence}. Docs: {docs}." |
|
|
"{synthesis}" |
|
|
).format( |
|
|
plan=plan.summary, |
|
|
evidence=evidence.findings + evidence.gaps, |
|
|
docs=docs.changelog_entry, |
|
|
synthesis=synthesis_context, |
|
|
) |
|
|
raw = self._call_claude(prompt) |
|
|
payload = _safe_json(raw, fallback={}) |
|
|
findings_payload = payload.get("findings", []) |
|
|
findings = [ |
|
|
ReviewFinding( |
|
|
severity=item.get("severity", "medium"), |
|
|
note=item.get("note", "") |
|
|
) |
|
|
for item in findings_payload |
|
|
] |
|
|
return ReviewReport( |
|
|
decision=payload.get("decision", "needs_info"), |
|
|
confidence=float(payload.get("confidence", 0.4)), |
|
|
findings=findings, |
|
|
) |
|
|
|
|
|
|
|
|
def _safe_json(text: str, fallback: Dict) -> Dict: |
|
|
import json |
|
|
|
|
|
try: |
|
|
return json.loads(text) |
|
|
except json.JSONDecodeError: |
|
|
return fallback |
|
|
|
|
|
|
|
|
def _safe_truncate(value: Dict, limit: int = 240) -> str: |
|
|
text = str(value) |
|
|
return text if len(text) <= limit else text[:limit] + "…" |
|
|
|