"""Deterministic multi-agent orchestration for the readiness copilot.""" from __future__ import annotations import asyncio from dataclasses import asdict, field from typing import Dict, Optional from agents import ( DocumentationAgent, EvidenceAgent, PlannerAgent, ReviewerAgent, SynthesisAgent, ) from deployment_agent import DeploymentAgent from docs_agent import DocumentationLookupAgent from error_handler import PartialResult, safe_execute, safe_execute_async from progress_tracker import AgentStatus, PipelineProgress from schemas import ( DeploymentActions, DocumentationReferences, ReadinessRequest, ReadinessResponse, ) class ReadinessOrchestrator: """Runs the enhanced pipeline with Context7 docs and GitHub deployment.""" def __init__(self) -> None: self.planner = PlannerAgent() self.evidence = EvidenceAgent() self.synthesis = SynthesisAgent() self.documentation = DocumentationAgent() self.reviewer = ReviewerAgent() self.docs_lookup = DocumentationLookupAgent() self.deployment = DeploymentAgent() def run(self, request: ReadinessRequest) -> ReadinessResponse: plan = self.planner.run(request) evidence = self.evidence.run(plan, project_name=request.project_name) sponsor_synthesis = self.synthesis.run(evidence, plan.summary) docs = self.documentation.run(request, evidence) review = self.reviewer.run(plan, evidence, docs, sponsor_synthesis) # Run async operations docs_refs = asyncio.run(self.docs_lookup.lookup_deployment_docs(request, plan)) deployment_config = asyncio.run(self.deployment.prepare_deployment(request, plan)) return ReadinessResponse( plan=plan, evidence=evidence, documentation=docs, review=review, docs_references=DocumentationReferences(**docs_refs), deployment=DeploymentActions(**deployment_config), ) def run_dict(self, payload: Dict, progress: Optional[PipelineProgress] = None) -> Dict: """Convenience wrapper for UI usage with plain dicts and progress tracking.""" if progress is None: progress = PipelineProgress() request = ReadinessRequest(**payload) partial = PartialResult() # Planner progress.update_agent("Planner", AgentStatus.RUNNING, "Analyzing project context...") plan = safe_execute( self.planner.run, request, default=None, error_message="Planner agent failed" ) if plan: progress.update_agent("Planner", AgentStatus.COMPLETED, "Plan generated", 1.0) partial.add_result("plan", plan) else: progress.update_agent("Planner", AgentStatus.FAILED, "Failed to generate plan", 0.0, "Planner returned None") partial.add_error("plan", "Planner agent failed") plan = None # Will need fallback # Evidence if plan: progress.update_agent("Evidence", AgentStatus.RUNNING, "Gathering deployment signals...") evidence = safe_execute( self.evidence.run, plan, request.project_name, default=None, error_message="Evidence agent failed" ) if evidence: progress.update_agent("Evidence", AgentStatus.COMPLETED, "Evidence gathered", 1.0) partial.add_result("evidence", evidence) else: progress.update_agent("Evidence", AgentStatus.FAILED, "Failed to gather evidence", 0.0) partial.add_error("evidence", "Evidence agent failed") evidence = None else: evidence = None progress.update_agent("Evidence", AgentStatus.SKIPPED, "Skipped (no plan)", 0.0) # Synthesis if evidence and plan: progress.update_agent("Synthesis", AgentStatus.RUNNING, "Cross-validating evidence...") sponsor_synthesis = safe_execute( self.synthesis.run, evidence, plan.summary, request.sponsor_llms, default={}, error_message="Synthesis agent failed" ) progress.update_agent("Synthesis", AgentStatus.COMPLETED, "Synthesis complete", 1.0) partial.add_result("sponsor_synthesis", sponsor_synthesis) else: sponsor_synthesis = {} progress.update_agent("Synthesis", AgentStatus.SKIPPED, "Skipped (no evidence)", 0.0) # Documentation if evidence: progress.update_agent("Documentation", AgentStatus.RUNNING, "Generating deployment docs...") docs = safe_execute( self.documentation.run, request, evidence, default=None, error_message="Documentation agent failed" ) if docs: progress.update_agent("Documentation", AgentStatus.COMPLETED, "Documentation generated", 1.0) partial.add_result("documentation", docs) else: progress.update_agent("Documentation", AgentStatus.FAILED, "Failed to generate docs", 0.0) partial.add_error("documentation", "Documentation agent failed") docs = None else: docs = None progress.update_agent("Documentation", AgentStatus.SKIPPED, "Skipped (no evidence)", 0.0) # Reviewer if plan and evidence and docs: progress.update_agent("Reviewer", AgentStatus.RUNNING, "Performing risk assessment...") review = safe_execute( self.reviewer.run, plan, evidence, docs, sponsor_synthesis, default=None, error_message="Reviewer agent failed" ) if review: progress.update_agent("Reviewer", AgentStatus.COMPLETED, "Review complete", 1.0) partial.add_result("review", review) else: progress.update_agent("Reviewer", AgentStatus.FAILED, "Failed to review", 0.0) partial.add_error("review", "Reviewer agent failed") review = None else: review = None progress.update_agent("Reviewer", AgentStatus.SKIPPED, "Skipped (missing inputs)", 0.0) # Docs Lookup (async) progress.update_agent("Docs Lookup", AgentStatus.RUNNING, "Looking up framework/platform docs...") docs_refs = asyncio.run( safe_execute_async( self.docs_lookup.lookup_deployment_docs, request, plan if plan else None, default={"framework": None, "platform": None, "lookups": []}, error_message="Documentation lookup failed" ) ) if docs_refs and docs_refs.get("lookups"): progress.update_agent("Docs Lookup", AgentStatus.COMPLETED, "Documentation found", 1.0) partial.add_result("docs_references", docs_refs) else: progress.update_agent("Docs Lookup", AgentStatus.COMPLETED, "No docs found (framework may be unknown)", 1.0) partial.add_result("docs_references", docs_refs or {"framework": None, "platform": None, "lookups": []}) # Deployment (async) progress.update_agent("Deployment", AgentStatus.RUNNING, "Preparing deployment actions...") deployment_config = asyncio.run( safe_execute_async( self.deployment.prepare_deployment, request, plan if plan else None, default={"repo": None, "branch": "main", "ready": False, "actions": []}, error_message="Deployment preparation failed" ) ) if deployment_config and deployment_config.get("ready"): progress.update_agent("Deployment", AgentStatus.COMPLETED, "Deployment ready", 1.0) partial.add_result("deployment", deployment_config) else: progress.update_agent("Deployment", AgentStatus.COMPLETED, "Deployment not configured", 1.0) partial.add_result("deployment", deployment_config or {"repo": None, "branch": "main", "ready": False, "actions": []}) # Build response response = ReadinessResponse( plan=plan or partial.results.get("plan"), evidence=evidence or partial.results.get("evidence"), documentation=docs or partial.results.get("documentation"), review=review or partial.results.get("review"), docs_references=DocumentationReferences(**docs_refs) if docs_refs else None, deployment=DeploymentActions(**deployment_config) if deployment_config else None, ) result = asdict(response) result["sponsor_synthesis"] = sponsor_synthesis result["progress"] = progress.to_dict() result["partial_results"] = partial.to_dict() return result async def execute_deployment(self, payload: Dict) -> Dict: """Execute deployment actions via GitHub.""" request = ReadinessRequest(**payload) plan = self.planner.run(request) deployment_config = await self.deployment.prepare_deployment(request, plan) execution_results = await self.deployment.execute_deployment(deployment_config) return execution_results