deploy-ready-copilot / orchestrator.py
HIMANSHUKUMARJHA's picture
Add sponsor LLM controls and secrets guidance
cd249cc
"""Deterministic multi-agent orchestration for the readiness copilot."""
from __future__ import annotations
import asyncio
from dataclasses import asdict, field
from typing import Dict, Optional
from agents import (
DocumentationAgent,
EvidenceAgent,
PlannerAgent,
ReviewerAgent,
SynthesisAgent,
)
from deployment_agent import DeploymentAgent
from docs_agent import DocumentationLookupAgent
from error_handler import PartialResult, safe_execute, safe_execute_async
from progress_tracker import AgentStatus, PipelineProgress
from schemas import (
DeploymentActions,
DocumentationReferences,
ReadinessRequest,
ReadinessResponse,
)
class ReadinessOrchestrator:
"""Runs the enhanced pipeline with Context7 docs and GitHub deployment."""
def __init__(self) -> None:
self.planner = PlannerAgent()
self.evidence = EvidenceAgent()
self.synthesis = SynthesisAgent()
self.documentation = DocumentationAgent()
self.reviewer = ReviewerAgent()
self.docs_lookup = DocumentationLookupAgent()
self.deployment = DeploymentAgent()
def run(self, request: ReadinessRequest) -> ReadinessResponse:
plan = self.planner.run(request)
evidence = self.evidence.run(plan, project_name=request.project_name)
sponsor_synthesis = self.synthesis.run(evidence, plan.summary)
docs = self.documentation.run(request, evidence)
review = self.reviewer.run(plan, evidence, docs, sponsor_synthesis)
# Run async operations
docs_refs = asyncio.run(self.docs_lookup.lookup_deployment_docs(request, plan))
deployment_config = asyncio.run(self.deployment.prepare_deployment(request, plan))
return ReadinessResponse(
plan=plan,
evidence=evidence,
documentation=docs,
review=review,
docs_references=DocumentationReferences(**docs_refs),
deployment=DeploymentActions(**deployment_config),
)
def run_dict(self, payload: Dict, progress: Optional[PipelineProgress] = None) -> Dict:
"""Convenience wrapper for UI usage with plain dicts and progress tracking."""
if progress is None:
progress = PipelineProgress()
request = ReadinessRequest(**payload)
partial = PartialResult()
# Planner
progress.update_agent("Planner", AgentStatus.RUNNING, "Analyzing project context...")
plan = safe_execute(
self.planner.run,
request,
default=None,
error_message="Planner agent failed"
)
if plan:
progress.update_agent("Planner", AgentStatus.COMPLETED, "Plan generated", 1.0)
partial.add_result("plan", plan)
else:
progress.update_agent("Planner", AgentStatus.FAILED, "Failed to generate plan", 0.0, "Planner returned None")
partial.add_error("plan", "Planner agent failed")
plan = None # Will need fallback
# Evidence
if plan:
progress.update_agent("Evidence", AgentStatus.RUNNING, "Gathering deployment signals...")
evidence = safe_execute(
self.evidence.run,
plan,
request.project_name,
default=None,
error_message="Evidence agent failed"
)
if evidence:
progress.update_agent("Evidence", AgentStatus.COMPLETED, "Evidence gathered", 1.0)
partial.add_result("evidence", evidence)
else:
progress.update_agent("Evidence", AgentStatus.FAILED, "Failed to gather evidence", 0.0)
partial.add_error("evidence", "Evidence agent failed")
evidence = None
else:
evidence = None
progress.update_agent("Evidence", AgentStatus.SKIPPED, "Skipped (no plan)", 0.0)
# Synthesis
if evidence and plan:
progress.update_agent("Synthesis", AgentStatus.RUNNING, "Cross-validating evidence...")
sponsor_synthesis = safe_execute(
self.synthesis.run,
evidence,
plan.summary,
request.sponsor_llms,
default={},
error_message="Synthesis agent failed"
)
progress.update_agent("Synthesis", AgentStatus.COMPLETED, "Synthesis complete", 1.0)
partial.add_result("sponsor_synthesis", sponsor_synthesis)
else:
sponsor_synthesis = {}
progress.update_agent("Synthesis", AgentStatus.SKIPPED, "Skipped (no evidence)", 0.0)
# Documentation
if evidence:
progress.update_agent("Documentation", AgentStatus.RUNNING, "Generating deployment docs...")
docs = safe_execute(
self.documentation.run,
request,
evidence,
default=None,
error_message="Documentation agent failed"
)
if docs:
progress.update_agent("Documentation", AgentStatus.COMPLETED, "Documentation generated", 1.0)
partial.add_result("documentation", docs)
else:
progress.update_agent("Documentation", AgentStatus.FAILED, "Failed to generate docs", 0.0)
partial.add_error("documentation", "Documentation agent failed")
docs = None
else:
docs = None
progress.update_agent("Documentation", AgentStatus.SKIPPED, "Skipped (no evidence)", 0.0)
# Reviewer
if plan and evidence and docs:
progress.update_agent("Reviewer", AgentStatus.RUNNING, "Performing risk assessment...")
review = safe_execute(
self.reviewer.run,
plan,
evidence,
docs,
sponsor_synthesis,
default=None,
error_message="Reviewer agent failed"
)
if review:
progress.update_agent("Reviewer", AgentStatus.COMPLETED, "Review complete", 1.0)
partial.add_result("review", review)
else:
progress.update_agent("Reviewer", AgentStatus.FAILED, "Failed to review", 0.0)
partial.add_error("review", "Reviewer agent failed")
review = None
else:
review = None
progress.update_agent("Reviewer", AgentStatus.SKIPPED, "Skipped (missing inputs)", 0.0)
# Docs Lookup (async)
progress.update_agent("Docs Lookup", AgentStatus.RUNNING, "Looking up framework/platform docs...")
docs_refs = asyncio.run(
safe_execute_async(
self.docs_lookup.lookup_deployment_docs,
request,
plan if plan else None,
default={"framework": None, "platform": None, "lookups": []},
error_message="Documentation lookup failed"
)
)
if docs_refs and docs_refs.get("lookups"):
progress.update_agent("Docs Lookup", AgentStatus.COMPLETED, "Documentation found", 1.0)
partial.add_result("docs_references", docs_refs)
else:
progress.update_agent("Docs Lookup", AgentStatus.COMPLETED, "No docs found (framework may be unknown)", 1.0)
partial.add_result("docs_references", docs_refs or {"framework": None, "platform": None, "lookups": []})
# Deployment (async)
progress.update_agent("Deployment", AgentStatus.RUNNING, "Preparing deployment actions...")
deployment_config = asyncio.run(
safe_execute_async(
self.deployment.prepare_deployment,
request,
plan if plan else None,
default={"repo": None, "branch": "main", "ready": False, "actions": []},
error_message="Deployment preparation failed"
)
)
if deployment_config and deployment_config.get("ready"):
progress.update_agent("Deployment", AgentStatus.COMPLETED, "Deployment ready", 1.0)
partial.add_result("deployment", deployment_config)
else:
progress.update_agent("Deployment", AgentStatus.COMPLETED, "Deployment not configured", 1.0)
partial.add_result("deployment", deployment_config or {"repo": None, "branch": "main", "ready": False, "actions": []})
# Build response
response = ReadinessResponse(
plan=plan or partial.results.get("plan"),
evidence=evidence or partial.results.get("evidence"),
documentation=docs or partial.results.get("documentation"),
review=review or partial.results.get("review"),
docs_references=DocumentationReferences(**docs_refs) if docs_refs else None,
deployment=DeploymentActions(**deployment_config) if deployment_config else None,
)
result = asdict(response)
result["sponsor_synthesis"] = sponsor_synthesis
result["progress"] = progress.to_dict()
result["partial_results"] = partial.to_dict()
return result
async def execute_deployment(self, payload: Dict) -> Dict:
"""Execute deployment actions via GitHub."""
request = ReadinessRequest(**payload)
plan = self.planner.run(request)
deployment_config = await self.deployment.prepare_deployment(request, plan)
execution_results = await self.deployment.execute_deployment(deployment_config)
return execution_results