Spaces:
Running
Running
| from typing import List, Dict, Any, Optional | |
| import uuid | |
| import threading | |
| from datetime import datetime | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.social_network import NetworkTopology | |
| from tinytroupe.environment.social_tiny_world import SocialTinyWorld, SimulationResult | |
| from tinytroupe.agent.social_types import Content | |
| from tinytroupe.ml_models import EngagementPredictor | |
| from tinytroupe.content_generation import ContentVariantGenerator | |
| from tinytroupe.network_generator import NetworkGenerator | |
| class SimulationConfig: | |
| def __init__(self, name: str, persona_count: int = 10, network_type: str = "scale_free", **kwargs): | |
| self.name = name | |
| self.persona_count = persona_count | |
| self.network_type = network_type | |
| self.user_id = kwargs.get("user_id") | |
| class Simulation: | |
| def __init__(self, id: str, config: SimulationConfig, world: SocialTinyWorld, personas: List[TinyPerson], network: NetworkTopology): | |
| self.id = id | |
| self.config = config | |
| self.world = world | |
| self.personas = personas | |
| self.network = network | |
| self.status = "ready" | |
| self.created_at = datetime.now() | |
| self.last_result: Optional[SimulationResult] = None | |
| self.chat_history: List[Dict[str, Any]] = [] | |
| self.progress = 0.0 | |
| class SimulationManager: | |
| """Manages simulation lifecycle and execution""" | |
| def __init__(self): | |
| self.simulations: Dict[str, Simulation] = {} | |
| self.focus_groups: Dict[str, List[TinyPerson]] = {} | |
| self.predictor = EngagementPredictor() | |
| self.variant_generator = ContentVariantGenerator() | |
| def create_simulation(self, config: SimulationConfig, focus_group_name: str = None) -> Simulation: | |
| if focus_group_name and focus_group_name in self.focus_groups: | |
| personas = self.focus_groups[focus_group_name] | |
| else: | |
| from tinytroupe.factory.tiny_person_factory import TinyPersonFactory | |
| factory = TinyPersonFactory( | |
| context=config.name, | |
| total_population_size=config.persona_count | |
| ) | |
| personas = factory.generate_people(number_of_people=config.persona_count) | |
| # Generate network | |
| net_gen = NetworkGenerator(personas) | |
| if config.network_type == "scale_free": | |
| network = net_gen.generate_scale_free_network(config.persona_count, 2) | |
| else: | |
| network = net_gen.generate_small_world_network(config.persona_count, 4, 0.1) | |
| # Create world | |
| world = SocialTinyWorld(config.name, network=network) | |
| for persona in personas: | |
| world.add_agent(persona) | |
| sim_id = str(uuid.uuid4()) | |
| simulation = Simulation(sim_id, config, world, personas, network) | |
| self.simulations[sim_id] = simulation | |
| return simulation | |
| def run_simulation(self, simulation_id: str, content: Content, mode: str = "full", background: bool = False) -> Optional[SimulationResult]: | |
| if simulation_id not in self.simulations: | |
| raise ValueError(f"Simulation {simulation_id} not found.") | |
| simulation = self.simulations[simulation_id] | |
| if background: | |
| thread = threading.Thread(target=self._run_simulation_task, args=(simulation, content)) | |
| thread.start() | |
| return None | |
| else: | |
| return self._run_simulation_task(simulation, content) | |
| def _run_simulation_task(self, simulation: Simulation, content: Content) -> SimulationResult: | |
| simulation.status = "running" | |
| simulation.progress = 0.1 | |
| initial_viewers = [p.name for p in simulation.personas[:5]] # Seed with first 5 | |
| # In a real async scenario, simulate_content_spread would update progress | |
| result = simulation.world.simulate_content_spread(content, initial_viewers) | |
| simulation.status = "completed" | |
| simulation.progress = 1.0 | |
| simulation.last_result = result | |
| return result | |
| def send_chat_message(self, simulation_id: str, sender: str, message: str) -> Dict[str, Any]: | |
| sim = self.get_simulation(simulation_id) | |
| if not sim: raise ValueError(f"Simulation {simulation_id} not found.") | |
| msg = { | |
| "sender": sender, | |
| "message": message, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| sim.chat_history.append(msg) | |
| # Trigger persona responses if it's a "User" message | |
| if sender == "User": | |
| # For now, pick a random persona to respond | |
| import random | |
| responder = random.choice(sim.personas) | |
| # In a real implementation, the persona would "think" and "act" | |
| response_text = f"As a {responder._persona.get('occupation')}, I think: {message[:10]}... sounds interesting!" | |
| response_msg = { | |
| "sender": responder.name, | |
| "message": response_text, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| sim.chat_history.append(response_msg) | |
| return msg | |
| def get_chat_history(self, simulation_id: str) -> List[Dict[str, Any]]: | |
| sim = self.get_simulation(simulation_id) | |
| if not sim: return [] | |
| return sim.chat_history | |
| def get_simulation(self, simulation_id: str, user_id: str = None) -> Optional[Simulation]: | |
| return self.simulations.get(simulation_id) | |
| def list_simulations(self) -> List[Dict[str, Any]]: | |
| return [ | |
| { | |
| "id": sim.id, | |
| "name": sim.config.name, | |
| "status": sim.status, | |
| "persona_count": len(sim.personas), | |
| "created_at": sim.created_at.isoformat() | |
| } | |
| for sim in self.simulations.values() | |
| ] | |
| def get_persona(self, simulation_id: str, persona_name: str) -> Optional[Dict[str, Any]]: | |
| sim = self.get_simulation(simulation_id) | |
| if not sim: return None | |
| for p in sim.personas: | |
| if p.name == persona_name: | |
| return p._persona | |
| return None | |
| def list_personas(self, simulation_id: str) -> List[Dict[str, Any]]: | |
| sim = self.get_simulation(simulation_id) | |
| if not sim: return [] | |
| return [p._persona for p in sim.personas] | |
| def save_focus_group(self, name: str, personas: List[TinyPerson]): | |
| self.focus_groups[name] = personas | |
| def list_focus_groups(self) -> List[str]: | |
| return list(self.focus_groups.keys()) | |
| def get_focus_group(self, name: str) -> Optional[List[TinyPerson]]: | |
| return self.focus_groups.get(name) | |
| def delete_simulation(self, simulation_id: str) -> bool: | |
| if simulation_id in self.simulations: | |
| del self.simulations[simulation_id] | |
| return True | |
| return False | |
| def export_simulation(self, simulation_id: str) -> Optional[Dict[str, Any]]: | |
| sim = self.get_simulation(simulation_id) | |
| if not sim: return None | |
| return { | |
| "id": sim.id, | |
| "config": { | |
| "name": sim.config.name, | |
| "persona_count": sim.config.persona_count, | |
| "network_type": sim.config.network_type | |
| }, | |
| "status": sim.status, | |
| "created_at": sim.created_at.isoformat(), | |
| "personas": [p._persona for p in sim.personas], | |
| "network": sim.network.get_metrics() | |
| } | |