NBA_Analysis / crew.py
shekkari21's picture
Add NBA analysis project files
ddabbe4
"""
Crew setup for NBA data analysis workflow.
"""
import os
import shutil
from crewai import Crew, Process
from agents import create_engineer_agent, create_analyst_agent, create_storyteller_agent
from tasks import create_data_engineering_task, create_data_analysis_task, create_custom_analysis_task, create_storyteller_task
def create_crew() -> Crew:
"""
Create and configure the CrewAI crew with agents and tasks.
Returns:
Crew: Configured CrewAI crew ready for execution
"""
# Create agents
engineer_agent = create_engineer_agent()
analyst_agent = create_analyst_agent()
# Create tasks
data_engineering_task = create_data_engineering_task(engineer_agent)
data_analysis_task = create_data_analysis_task(analyst_agent, data_engineering_task)
# Create and return the crew
return Crew(
agents=[engineer_agent, analyst_agent],
tasks=[data_engineering_task, data_analysis_task],
process=Process.sequential,
verbose=True,
)
def create_crew_with_custom_task(user_query: str, csv_path: str = None) -> Crew:
"""
Create a CrewAI crew with engineering task, custom analyst task, and storyteller task.
Args:
user_query: The user's custom analysis query/task
csv_path: Optional path to CSV file (if None, uses default from config)
Returns:
Crew: Configured CrewAI crew ready for execution
"""
# Create agents (they will use the csv_path from tools)
engineer_agent = create_engineer_agent(csv_path)
analyst_agent = create_analyst_agent(csv_path)
storyteller_agent = create_storyteller_agent()
# Create engineering task (fixed)
data_engineering_task = create_data_engineering_task(engineer_agent, csv_path)
# Create custom analyst task from user input (no dependency on engineer task for parallel execution)
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
# Create storyteller task that uses the analyst's output
storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
# Create and return the crew
return Crew(
agents=[engineer_agent, analyst_agent, storyteller_agent],
tasks=[data_engineering_task, custom_analysis_task, storyteller_task],
process=Process.sequential,
verbose=True,
)
def create_flow_crew(user_query: str, csv_path: str) -> Crew:
"""
Create a single crew with parallel tasks (Engineer and Analyst) that merge results at the end.
This satisfies the assignment requirement: "Parallelize tasks via a Flow; merge results at the end."
Args:
user_query: The user's custom analysis query/task
csv_path: Path to the uploaded CSV file
Returns:
Crew: Single crew with parallel tasks that will merge results
"""
# Create all agents
engineer_agent = create_engineer_agent(csv_path)
analyst_agent = create_analyst_agent(csv_path)
storyteller_agent = create_storyteller_agent()
# Create tasks WITHOUT dependencies so they can run in parallel
# Engineer task - independent
data_engineering_task = create_data_engineering_task(engineer_agent, csv_path)
# Analyst task - independent (no dependency on engineer for parallel execution)
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
# Storyteller task - depends on analyst (runs after analyst completes)
storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
# Create a single crew with all tasks
# Tasks without dependencies will run in parallel
# Storyteller will run after analyst completes
return Crew(
agents=[engineer_agent, analyst_agent, storyteller_agent],
tasks=[data_engineering_task, custom_analysis_task, storyteller_task],
process=Process.sequential, # CrewAI will parallelize independent tasks automatically
verbose=True,
)
def create_analysis_only_crew(user_query: str, csv_path: str) -> Crew:
"""
Create a crew with only Analyst and Storyteller agents (no Engineer).
Used when engineer results are already available and user asks a new question.
Args:
user_query: The user's custom analysis query/task
csv_path: Path to the uploaded CSV file
Returns:
Crew: Crew with only analyst and storyteller tasks
"""
# Create only analyst and storyteller agents
analyst_agent = create_analyst_agent(csv_path)
storyteller_agent = create_storyteller_agent()
# Create analyst task with user query
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
# Storyteller task depends on analyst
storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
return Crew(
agents=[analyst_agent, storyteller_agent],
tasks=[custom_analysis_task, storyteller_task],
process=Process.sequential,
verbose=True,
)
def create_analyst_only_crew(user_query: str, csv_path: str) -> Crew:
"""
Create a crew with only Analyst agent (no Engineer, no Storyteller).
Used for specific user questions where only analysis is needed.
Args:
user_query: The user's custom analysis query/task
csv_path: Path to the uploaded CSV file
Returns:
Crew: Crew with only analyst task
"""
# Create only analyst agent
analyst_agent = create_analyst_agent(csv_path)
# Create analyst task with user query
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
return Crew(
agents=[analyst_agent],
tasks=[custom_analysis_task],
process=Process.sequential,
verbose=True,
)