Spaces:
Sleeping
Sleeping
| """ | |
| Crew setup for NBA data analysis workflow. | |
| """ | |
| import os | |
| import shutil | |
| from crewai import Crew, Process | |
| from agents import create_engineer_agent, create_analyst_agent, create_storyteller_agent | |
| from tasks import create_data_engineering_task, create_data_analysis_task, create_custom_analysis_task, create_storyteller_task | |
| def create_crew() -> Crew: | |
| """ | |
| Create and configure the CrewAI crew with agents and tasks. | |
| Returns: | |
| Crew: Configured CrewAI crew ready for execution | |
| """ | |
| # Create agents | |
| engineer_agent = create_engineer_agent() | |
| analyst_agent = create_analyst_agent() | |
| # Create tasks | |
| data_engineering_task = create_data_engineering_task(engineer_agent) | |
| data_analysis_task = create_data_analysis_task(analyst_agent, data_engineering_task) | |
| # Create and return the crew | |
| return Crew( | |
| agents=[engineer_agent, analyst_agent], | |
| tasks=[data_engineering_task, data_analysis_task], | |
| process=Process.sequential, | |
| verbose=True, | |
| ) | |
| def create_crew_with_custom_task(user_query: str, csv_path: str = None) -> Crew: | |
| """ | |
| Create a CrewAI crew with engineering task, custom analyst task, and storyteller task. | |
| Args: | |
| user_query: The user's custom analysis query/task | |
| csv_path: Optional path to CSV file (if None, uses default from config) | |
| Returns: | |
| Crew: Configured CrewAI crew ready for execution | |
| """ | |
| # Create agents (they will use the csv_path from tools) | |
| engineer_agent = create_engineer_agent(csv_path) | |
| analyst_agent = create_analyst_agent(csv_path) | |
| storyteller_agent = create_storyteller_agent() | |
| # Create engineering task (fixed) | |
| data_engineering_task = create_data_engineering_task(engineer_agent, csv_path) | |
| # Create custom analyst task from user input (no dependency on engineer task for parallel execution) | |
| custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) | |
| # Create storyteller task that uses the analyst's output | |
| storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task) | |
| # Create and return the crew | |
| return Crew( | |
| agents=[engineer_agent, analyst_agent, storyteller_agent], | |
| tasks=[data_engineering_task, custom_analysis_task, storyteller_task], | |
| process=Process.sequential, | |
| verbose=True, | |
| ) | |
| def create_flow_crew(user_query: str, csv_path: str) -> Crew: | |
| """ | |
| Create a single crew with parallel tasks (Engineer and Analyst) that merge results at the end. | |
| This satisfies the assignment requirement: "Parallelize tasks via a Flow; merge results at the end." | |
| Args: | |
| user_query: The user's custom analysis query/task | |
| csv_path: Path to the uploaded CSV file | |
| Returns: | |
| Crew: Single crew with parallel tasks that will merge results | |
| """ | |
| # Create all agents | |
| engineer_agent = create_engineer_agent(csv_path) | |
| analyst_agent = create_analyst_agent(csv_path) | |
| storyteller_agent = create_storyteller_agent() | |
| # Create tasks WITHOUT dependencies so they can run in parallel | |
| # Engineer task - independent | |
| data_engineering_task = create_data_engineering_task(engineer_agent, csv_path) | |
| # Analyst task - independent (no dependency on engineer for parallel execution) | |
| custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) | |
| # Storyteller task - depends on analyst (runs after analyst completes) | |
| storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task) | |
| # Create a single crew with all tasks | |
| # Tasks without dependencies will run in parallel | |
| # Storyteller will run after analyst completes | |
| return Crew( | |
| agents=[engineer_agent, analyst_agent, storyteller_agent], | |
| tasks=[data_engineering_task, custom_analysis_task, storyteller_task], | |
| process=Process.sequential, # CrewAI will parallelize independent tasks automatically | |
| verbose=True, | |
| ) | |
| def create_analysis_only_crew(user_query: str, csv_path: str) -> Crew: | |
| """ | |
| Create a crew with only Analyst and Storyteller agents (no Engineer). | |
| Used when engineer results are already available and user asks a new question. | |
| Args: | |
| user_query: The user's custom analysis query/task | |
| csv_path: Path to the uploaded CSV file | |
| Returns: | |
| Crew: Crew with only analyst and storyteller tasks | |
| """ | |
| # Create only analyst and storyteller agents | |
| analyst_agent = create_analyst_agent(csv_path) | |
| storyteller_agent = create_storyteller_agent() | |
| # Create analyst task with user query | |
| custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) | |
| # Storyteller task depends on analyst | |
| storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task) | |
| return Crew( | |
| agents=[analyst_agent, storyteller_agent], | |
| tasks=[custom_analysis_task, storyteller_task], | |
| process=Process.sequential, | |
| verbose=True, | |
| ) | |
| def create_analyst_only_crew(user_query: str, csv_path: str) -> Crew: | |
| """ | |
| Create a crew with only Analyst agent (no Engineer, no Storyteller). | |
| Used for specific user questions where only analysis is needed. | |
| Args: | |
| user_query: The user's custom analysis query/task | |
| csv_path: Path to the uploaded CSV file | |
| Returns: | |
| Crew: Crew with only analyst task | |
| """ | |
| # Create only analyst agent | |
| analyst_agent = create_analyst_agent(csv_path) | |
| # Create analyst task with user query | |
| custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) | |
| return Crew( | |
| agents=[analyst_agent], | |
| tasks=[custom_analysis_task], | |
| process=Process.sequential, | |
| verbose=True, | |
| ) | |