Spaces:
Sleeping
Sleeping
| """ | |
| Campaign Agent for managing D&D campaigns | |
| """ | |
| import os | |
| import sqlite3 | |
| import json | |
| from datetime import datetime | |
| from typing import Optional, List | |
| from src.models.campaign import Campaign, CampaignEvent, CampaignTheme, EventType | |
| from src.models.character import Character | |
| from src.models.session_notes import SessionNotes | |
| from src.utils.ai_client import get_ai_client | |
| class CampaignAgent: | |
| """Agent for managing D&D campaigns""" | |
| def __init__(self, db_path: str = "data/campaigns.db"): | |
| """Initialize campaign agent""" | |
| self.db_path = db_path | |
| self.ai_client = get_ai_client() | |
| self._ensure_database() | |
| def _ensure_database(self): | |
| """Ensure database and tables exist""" | |
| os.makedirs(os.path.dirname(self.db_path), exist_ok=True) | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Campaigns table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS campaigns ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| theme TEXT NOT NULL, | |
| setting TEXT NOT NULL, | |
| world_name TEXT, | |
| starting_location TEXT, | |
| summary TEXT NOT NULL, | |
| current_arc TEXT, | |
| level_range TEXT, | |
| main_conflict TEXT NOT NULL, | |
| key_factions TEXT, | |
| major_villains TEXT, | |
| central_mysteries TEXT, | |
| character_ids TEXT, | |
| party_size INTEGER, | |
| current_session INTEGER, | |
| total_sessions INTEGER, | |
| game_master TEXT, | |
| is_active BOOLEAN, | |
| homebrew_rules TEXT, | |
| notes TEXT, | |
| created_at TEXT, | |
| updated_at TEXT, | |
| last_session_date TEXT | |
| ) | |
| """) | |
| # Campaign events table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS campaign_events ( | |
| id TEXT PRIMARY KEY, | |
| campaign_id TEXT NOT NULL, | |
| session_number INTEGER NOT NULL, | |
| event_type TEXT NOT NULL, | |
| title TEXT NOT NULL, | |
| description TEXT NOT NULL, | |
| characters_involved TEXT, | |
| npcs_involved TEXT, | |
| locations TEXT, | |
| consequences TEXT, | |
| items_gained TEXT, | |
| items_lost TEXT, | |
| experience_awarded INTEGER, | |
| timestamp TEXT, | |
| importance INTEGER, | |
| tags TEXT, | |
| gm_notes TEXT, | |
| player_visible BOOLEAN, | |
| FOREIGN KEY (campaign_id) REFERENCES campaigns(id) | |
| ) | |
| """) | |
| # Session notes table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS session_notes ( | |
| id TEXT PRIMARY KEY, | |
| campaign_id TEXT NOT NULL, | |
| session_number INTEGER NOT NULL, | |
| notes TEXT NOT NULL, | |
| uploaded_at TEXT NOT NULL, | |
| file_name TEXT, | |
| file_type TEXT, | |
| FOREIGN KEY (campaign_id) REFERENCES campaigns(id), | |
| UNIQUE(campaign_id, session_number) | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| def create_campaign( | |
| self, | |
| name: str, | |
| theme: str, | |
| setting: str, | |
| summary: str, | |
| main_conflict: str, | |
| game_master: str = "", | |
| world_name: str = "", | |
| starting_location: str = "", | |
| level_range: str = "1-5", | |
| party_size: int = 4 | |
| ) -> Campaign: | |
| """Create a new campaign""" | |
| # Generate campaign ID | |
| campaign_id = name.lower().replace(" ", "-").replace("'", "") | |
| # Create campaign | |
| campaign = Campaign( | |
| id=campaign_id, | |
| name=name, | |
| theme=CampaignTheme(theme), | |
| setting=setting, | |
| world_name=world_name, | |
| starting_location=starting_location, | |
| summary=summary, | |
| main_conflict=main_conflict, | |
| level_range=level_range, | |
| party_size=party_size, | |
| game_master=game_master, | |
| current_session=1, | |
| total_sessions=0, | |
| is_active=True | |
| ) | |
| # Save to database | |
| self.save_campaign(campaign) | |
| return campaign | |
| def save_campaign(self, campaign: Campaign): | |
| """Save campaign to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO campaigns ( | |
| id, name, theme, setting, world_name, starting_location, | |
| summary, current_arc, level_range, main_conflict, | |
| key_factions, major_villains, central_mysteries, | |
| character_ids, party_size, current_session, total_sessions, | |
| game_master, is_active, homebrew_rules, notes, | |
| created_at, updated_at, last_session_date | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| campaign.id, | |
| campaign.name, | |
| campaign.theme.value, | |
| campaign.setting, | |
| campaign.world_name, | |
| campaign.starting_location, | |
| campaign.summary, | |
| campaign.current_arc, | |
| campaign.level_range, | |
| campaign.main_conflict, | |
| json.dumps(campaign.key_factions), | |
| json.dumps(campaign.major_villains), | |
| json.dumps(campaign.central_mysteries), | |
| json.dumps(campaign.character_ids), | |
| campaign.party_size, | |
| campaign.current_session, | |
| campaign.total_sessions, | |
| campaign.game_master, | |
| campaign.is_active, | |
| json.dumps(campaign.homebrew_rules), | |
| campaign.notes, | |
| campaign.created_at.isoformat() if isinstance(campaign.created_at, datetime) else campaign.created_at, | |
| campaign.updated_at.isoformat() if isinstance(campaign.updated_at, datetime) else campaign.updated_at, | |
| campaign.last_session_date.isoformat() if campaign.last_session_date else None | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def load_campaign(self, campaign_id: str) -> Optional[Campaign]: | |
| """Load campaign from database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM campaigns WHERE id = ?", (campaign_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| # Convert row to dict | |
| columns = [ | |
| 'id', 'name', 'theme', 'setting', 'world_name', 'starting_location', | |
| 'summary', 'current_arc', 'level_range', 'main_conflict', | |
| 'key_factions', 'major_villains', 'central_mysteries', | |
| 'character_ids', 'party_size', 'current_session', 'total_sessions', | |
| 'game_master', 'is_active', 'homebrew_rules', 'notes', | |
| 'created_at', 'updated_at', 'last_session_date' | |
| ] | |
| data = {} | |
| for i, col in enumerate(columns): | |
| value = row[i] | |
| # Parse JSON fields | |
| if col in ['key_factions', 'major_villains', 'central_mysteries', 'character_ids', 'homebrew_rules']: | |
| data[col] = json.loads(value) if value else [] | |
| else: | |
| data[col] = value | |
| return Campaign(**data) | |
| def list_campaigns(self, active_only: bool = False) -> List[Campaign]: | |
| """List all campaigns""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| if active_only: | |
| cursor.execute("SELECT id FROM campaigns WHERE is_active = 1") | |
| else: | |
| cursor.execute("SELECT id FROM campaigns") | |
| campaign_ids = [row[0] for row in cursor.fetchall()] | |
| conn.close() | |
| return [self.load_campaign(cid) for cid in campaign_ids] | |
| def delete_campaign(self, campaign_id: str) -> bool: | |
| """Delete a campaign and all associated data""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Delete session notes first | |
| cursor.execute("DELETE FROM session_notes WHERE campaign_id = ?", (campaign_id,)) | |
| # Delete events | |
| cursor.execute("DELETE FROM campaign_events WHERE campaign_id = ?", (campaign_id,)) | |
| # Delete campaign | |
| cursor.execute("DELETE FROM campaigns WHERE id = ?", (campaign_id,)) | |
| deleted = cursor.rowcount > 0 | |
| conn.commit() | |
| conn.close() | |
| return deleted | |
| def add_character_to_campaign(self, campaign_id: str, character_id: str) -> bool: | |
| """Add a character to a campaign""" | |
| campaign = self.load_campaign(campaign_id) | |
| if not campaign: | |
| return False | |
| campaign.add_character(character_id) | |
| self.save_campaign(campaign) | |
| return True | |
| def remove_character_from_campaign(self, campaign_id: str, character_id: str) -> bool: | |
| """Remove a character from a campaign""" | |
| campaign = self.load_campaign(campaign_id) | |
| if not campaign: | |
| return False | |
| campaign.remove_character(character_id) | |
| self.save_campaign(campaign) | |
| return True | |
| def start_new_session(self, campaign_id: str) -> bool: | |
| """Start a new session for a campaign""" | |
| campaign = self.load_campaign(campaign_id) | |
| if not campaign: | |
| return False | |
| campaign.start_new_session() | |
| self.save_campaign(campaign) | |
| return True | |
| def add_event( | |
| self, | |
| campaign_id: str, | |
| event_type: str, | |
| title: str, | |
| description: str, | |
| session_number: Optional[int] = None, | |
| characters_involved: Optional[List[str]] = None, | |
| npcs_involved: Optional[List[str]] = None, | |
| locations: Optional[List[str]] = None, | |
| importance: int = 3 | |
| ) -> Optional[CampaignEvent]: | |
| """Add an event to a campaign""" | |
| campaign = self.load_campaign(campaign_id) | |
| if not campaign: | |
| return None | |
| # Use current session if not specified | |
| if session_number is None: | |
| session_number = campaign.current_session | |
| # Create event | |
| event_id = f"{campaign_id}-event-{datetime.now().timestamp()}" | |
| event = CampaignEvent( | |
| id=event_id, | |
| campaign_id=campaign_id, | |
| session_number=session_number, | |
| event_type=EventType(event_type), | |
| title=title, | |
| description=description, | |
| characters_involved=characters_involved or [], | |
| npcs_involved=npcs_involved or [], | |
| locations=locations or [], | |
| importance=importance | |
| ) | |
| # Save to database | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO campaign_events ( | |
| id, campaign_id, session_number, event_type, title, description, | |
| characters_involved, npcs_involved, locations, consequences, | |
| items_gained, items_lost, experience_awarded, timestamp, | |
| importance, tags, gm_notes, player_visible | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| event.id, | |
| event.campaign_id, | |
| event.session_number, | |
| event.event_type.value, | |
| event.title, | |
| event.description, | |
| json.dumps(event.characters_involved), | |
| json.dumps(event.npcs_involved), | |
| json.dumps(event.locations), | |
| json.dumps(event.consequences), | |
| json.dumps(event.items_gained), | |
| json.dumps(event.items_lost), | |
| event.experience_awarded, | |
| event.timestamp.isoformat(), | |
| event.importance, | |
| json.dumps(event.tags), | |
| event.gm_notes, | |
| event.player_visible | |
| )) | |
| conn.commit() | |
| conn.close() | |
| # Update campaign memory | |
| campaign.add_event(event) | |
| self.save_campaign(campaign) | |
| return event | |
| def get_campaign_events(self, campaign_id: str, session_number: Optional[int] = None) -> List[CampaignEvent]: | |
| """Get events for a campaign""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| if session_number: | |
| cursor.execute( | |
| "SELECT * FROM campaign_events WHERE campaign_id = ? AND session_number = ? ORDER BY timestamp", | |
| (campaign_id, session_number) | |
| ) | |
| else: | |
| cursor.execute( | |
| "SELECT * FROM campaign_events WHERE campaign_id = ? ORDER BY timestamp", | |
| (campaign_id,) | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| events = [] | |
| for row in rows: | |
| event_data = { | |
| 'id': row[0], | |
| 'campaign_id': row[1], | |
| 'session_number': row[2], | |
| 'event_type': row[3], | |
| 'title': row[4], | |
| 'description': row[5], | |
| 'characters_involved': json.loads(row[6]) if row[6] else [], | |
| 'npcs_involved': json.loads(row[7]) if row[7] else [], | |
| 'locations': json.loads(row[8]) if row[8] else [], | |
| 'consequences': json.loads(row[9]) if row[9] else [], | |
| 'items_gained': json.loads(row[10]) if row[10] else [], | |
| 'items_lost': json.loads(row[11]) if row[11] else [], | |
| 'experience_awarded': row[12], | |
| 'timestamp': row[13], | |
| 'importance': row[14], | |
| 'tags': json.loads(row[15]) if row[15] else [], | |
| 'gm_notes': row[16], | |
| 'player_visible': row[17] | |
| } | |
| events.append(CampaignEvent(**event_data)) | |
| return events | |
| def export_campaign_summary(self, campaign_id: str) -> str: | |
| """Export campaign summary as markdown""" | |
| campaign = self.load_campaign(campaign_id) | |
| if not campaign: | |
| return "Campaign not found" | |
| return campaign.to_markdown() | |
| def synthesize_campaign_from_characters( | |
| self, | |
| characters: List[Character], | |
| game_master: str = "", | |
| additional_notes: str = "" | |
| ) -> Campaign: | |
| """ | |
| Synthesize a campaign tailored to the provided characters using AI. | |
| Analyzes the party composition, backstories, alignments, and creates | |
| a custom campaign that fits the characters. | |
| """ | |
| # Build character analysis | |
| party_analysis = [] | |
| for char in characters: | |
| party_analysis.append(f""" | |
| - **{char.name}** (Level {char.level} {char.race.value} {char.character_class.value}) | |
| - Alignment: {char.alignment.value} | |
| - Background: {char.background.background_type} | |
| - Backstory: {char.background.backstory[:200]}... | |
| - Personality: {char.background.personality_traits[:150]}...""") | |
| party_summary = "\n".join(party_analysis) | |
| # Calculate party level range | |
| levels = [char.level for char in characters] | |
| min_level = min(levels) | |
| max_level = max(levels) | |
| avg_level = sum(levels) // len(levels) | |
| level_range = f"{min_level}-{max_level}" if min_level != max_level else f"{min_level}" | |
| # Determine appropriate challenge level | |
| if avg_level <= 3: | |
| tier = "Tier 1 (Local Heroes)" | |
| scope = "local region or small kingdom" | |
| elif avg_level <= 10: | |
| tier = "Tier 2 (Heroes of the Realm)" | |
| scope = "kingdom or large region" | |
| elif avg_level <= 16: | |
| tier = "Tier 3 (Masters of the Realm)" | |
| scope = "continent or multiple kingdoms" | |
| else: | |
| tier = "Tier 4 (Masters of the World)" | |
| scope = "entire world or planar" | |
| # Create AI prompt for campaign synthesis | |
| prompt = f"""You are an expert Dungeon Master creating a COMPLETE campaign guide for D&D 5e. | |
| **Party Composition ({len(characters)} characters, Level {level_range}):** | |
| {party_summary} | |
| **Campaign Tier:** {tier} | |
| **Appropriate Scope:** {scope} | |
| {f"**Additional DM Notes:** {additional_notes}" if additional_notes else ""} | |
| Create a DETAILED campaign that weaves all these characters together. Include: | |
| 1. **Character Connections**: How do these characters know each other or become connected? | |
| 2. **Personal Stakes**: What does each character have to lose/gain? | |
| 3. **Villain Details**: Full profiles with motivations and how they threaten each PC | |
| 4. **World Details**: Specific locations, politics, and cultural elements | |
| 5. **Adventure Hooks**: 3-4 specific scenarios that pull characters in | |
| 6. **First Session Outline**: Concrete opening scenario | |
| 7. **Story Progression**: Where the campaign leads over 5-10 sessions | |
| Generate a comprehensive campaign document with these sections: | |
| **CAMPAIGN_NAME:** (Epic, memorable name - 2-5 words) | |
| **THEME:** (Choose ONE: High Fantasy, Dark Fantasy, Urban Fantasy, Political Intrigue, Horror, Exploration, Dungeon Crawl, or Custom) | |
| **WORLD_NAME:** (Name of the world/realm) | |
| **STARTING_LOCATION:** (Town, city, or region where adventure begins) | |
| **SETTING:** (3-4 sentences about the world, its current political state, and atmosphere) | |
| **SUMMARY:** (4-5 sentences explaining the campaign hook and what drives the story) | |
| **CONFLICT:** (2-3 sentences describing the central threat and why it matters) | |
| **FACTIONS:** (List 3-4 factions with brief descriptions, format: "Faction Name - description") | |
| **VILLAINS:** (2-3 detailed villain profiles with: | |
| - Name and role | |
| - Motivation (what they want and why) | |
| - Connection to party (which PCs they threaten and how) | |
| - Methods (how they operate) | |
| Format: "Villain Name - Role | Motivation | Connections | Methods") | |
| **CHARACTER_CONNECTIONS:** REQUIRED - For EACH party member listed above, explain how they fit (Format: "Name: connection details | Name2: connection details | ...") | |
| **ADVENTURE_HOOKS:** REQUIRED - List 3-4 specific hooks separated by pipes (Format: "Hook 1 details | Hook 2 details | Hook 3 details") | |
| **FIRST_SESSION:** REQUIRED - Detailed opening scene (Format: "Scene description") | |
| **SESSION_OUTLINES:** REQUIRED - Sessions 2-5 separated by pipes (Format: "Session 2 details | Session 3 details | Session 4 details | Session 5 details") | |
| **MYSTERIES:** REQUIRED - 2-3 mysteries separated by pipes (Format: "Mystery 1 | Mystery 2 | Mystery 3") | |
| **KEY_NPCS:** REQUIRED - 3-4 NPCs separated by pipes (Format: "NPC Name - role and connection | NPC2 Name - role and connection | ...") | |
| **LOCATIONS:** REQUIRED - 3-4 locations separated by pipes (Format: "Location Name - description | Location2 Name - description | ...") | |
| **STORY_ARC:** (First major story arc with clear beginning, middle, and climax) | |
| Format your response EXACTLY as follows (DO NOT SKIP ANY FIELDS): | |
| --- | |
| CAMPAIGN_NAME: [name] | |
| THEME: [theme] | |
| WORLD_NAME: [world] | |
| STARTING_LOCATION: [location] | |
| SETTING: [setting details] | |
| SUMMARY: [campaign summary] | |
| CONFLICT: [main conflict] | |
| FACTIONS: [faction1 - desc | faction2 - desc | faction3 - desc] | |
| VILLAINS: [villain1 details | villain2 details] | |
| CHARACTER_CONNECTIONS: [char1: how they fit | char2: how they fit | char3: how they fit | char4: how they fit] | |
| ADVENTURE_HOOKS: [hook1 description in 2-3 sentences | hook2 description | hook3 description | hook4 description] | |
| FIRST_SESSION: [opening scene details] | |
| SESSION_OUTLINES: [session 2 outline | session 3 outline | session 4 outline | session 5 outline] | |
| MYSTERIES: [mystery1 details | mystery2 details | mystery3 details] | |
| KEY_NPCS: [NPC Name - their role and connection to party | NPC2 Name - role and connection | NPC3 Name - role | NPC4 Name - role] | |
| LOCATIONS: [Location Name - description of place | Location2 Name - description | Location3 Name - description | Location4 Name - description] | |
| STORY_ARC: [arc details] | |
| --- | |
| IMPORTANT: Every field listed above MUST have content. Use pipe separators (|) between items in list fields.""" | |
| # Get AI response | |
| response = self.ai_client.generate_creative(prompt) | |
| # Parse the response | |
| parsed = self._parse_campaign_synthesis(response) | |
| # If critical fields are missing, make a second focused call | |
| missing_fields = [] | |
| if not parsed.get('character_connections') or len(parsed.get('character_connections', [])) == 0: | |
| missing_fields.append('CHARACTER_CONNECTIONS') | |
| if not parsed.get('adventure_hooks') or len(parsed.get('adventure_hooks', [])) == 0: | |
| missing_fields.append('ADVENTURE_HOOKS') | |
| if not parsed.get('key_npcs') or len(parsed.get('key_npcs', [])) == 0: | |
| missing_fields.append('KEY_NPCS') | |
| if not parsed.get('locations') or len(parsed.get('locations', [])) == 0: | |
| missing_fields.append('LOCATIONS') | |
| if not parsed.get('session_outlines') or len(parsed.get('session_outlines', [])) == 0: | |
| missing_fields.append('SESSION_OUTLINES') | |
| if missing_fields: | |
| # Make focused call for missing fields | |
| fields_list = ' '.join(['**' + field + ':**' for field in missing_fields]) | |
| format_template = '\n'.join([field + ': [details | more details | etc]' for field in missing_fields]) | |
| followup_prompt = f"""Campaign: {parsed.get('name', 'Campaign')} | |
| Setting: {parsed.get('setting', '')} | |
| Party: | |
| {party_summary} | |
| Generate ONLY these missing campaign elements. Use pipe (|) separators between items: | |
| {fields_list} | |
| Format: | |
| --- | |
| {format_template} | |
| ---""" | |
| followup_response = self.ai_client.generate_creative(followup_prompt) | |
| followup_parsed = self._parse_campaign_synthesis(followup_response) | |
| # Merge the results | |
| for key in ['character_connections', 'adventure_hooks', 'key_npcs', 'locations', 'session_outlines']: | |
| if key in followup_parsed and followup_parsed[key]: | |
| parsed[key] = followup_parsed[key] | |
| # Create campaign | |
| campaign = self.create_campaign( | |
| name=parsed.get('name', 'Untitled Campaign'), | |
| theme=parsed.get('theme', 'High Fantasy'), | |
| setting=parsed.get('setting', 'A fantasy realm awaits...'), | |
| summary=parsed.get('summary', 'An epic adventure begins...'), | |
| main_conflict=parsed.get('conflict', 'Evil threatens the land...'), | |
| game_master=game_master, | |
| world_name=parsed.get('world_name', 'The Realm'), | |
| starting_location=parsed.get('starting_location', 'The Crossroads'), | |
| level_range=level_range, | |
| party_size=len(characters) | |
| ) | |
| # Load the created campaign and add additional details | |
| campaign = self.load_campaign(campaign.id) | |
| # Add factions, villains, and mysteries | |
| if 'factions' in parsed: | |
| campaign.key_factions = parsed['factions'] | |
| if 'villains' in parsed: | |
| campaign.major_villains = parsed['villains'] | |
| if 'mysteries' in parsed: | |
| campaign.central_mysteries = parsed['mysteries'] | |
| if 'story_arc' in parsed: | |
| campaign.current_arc = parsed['story_arc'] | |
| # Add detailed campaign notes with all the extra information | |
| campaign_notes = self._build_campaign_notes(parsed) | |
| campaign.notes = campaign_notes | |
| # Add all characters to the campaign | |
| for char in characters: | |
| campaign.add_character(char.id) | |
| # Save updated campaign | |
| self.save_campaign(campaign) | |
| return campaign | |
| def _build_campaign_notes(self, parsed: dict) -> str: | |
| """Build detailed campaign notes from parsed data""" | |
| notes = [] | |
| if 'character_connections' in parsed: | |
| notes.append("## Character Connections\n") | |
| for conn in parsed['character_connections']: | |
| notes.append(f"- {conn}\n") | |
| notes.append("\n") | |
| if 'adventure_hooks' in parsed: | |
| notes.append("## Adventure Hooks\n") | |
| for i, hook in enumerate(parsed['adventure_hooks'], 1): | |
| notes.append(f"{i}. {hook}\n") | |
| notes.append("\n") | |
| if 'first_session' in parsed: | |
| notes.append("## First Session Opening\n") | |
| notes.append(f"{parsed['first_session']}\n\n") | |
| if 'session_outlines' in parsed: | |
| notes.append("## Session Progression\n") | |
| for i, outline in enumerate(parsed['session_outlines'], 2): | |
| notes.append(f"**Session {i}:** {outline}\n") | |
| notes.append("\n") | |
| if 'key_npcs' in parsed: | |
| notes.append("## Key NPCs\n") | |
| for npc in parsed['key_npcs']: | |
| notes.append(f"- {npc}\n") | |
| notes.append("\n") | |
| if 'locations' in parsed: | |
| notes.append("## Key Locations\n") | |
| for loc in parsed['locations']: | |
| notes.append(f"- {loc}\n") | |
| notes.append("\n") | |
| return "".join(notes) | |
| def _parse_campaign_synthesis(self, ai_response: str) -> dict: | |
| """Parse AI response for campaign synthesis""" | |
| parsed = {} | |
| # Extract content between --- markers | |
| if '---' in ai_response: | |
| parts = ai_response.split('---') | |
| if len(parts) >= 2: | |
| content = parts[1] | |
| else: | |
| content = ai_response | |
| else: | |
| content = ai_response | |
| # Parse each field | |
| lines = content.strip().split('\n') | |
| for line in lines: | |
| if ':' in line: | |
| key, value = line.split(':', 1) | |
| key = key.strip().lower().replace('campaign_', '').replace('_', '_') | |
| value = value.strip() | |
| # Handle pipe-separated fields (detailed lists) | |
| if key in ['factions', 'villains', 'character_connections', 'adventure_hooks', | |
| 'session_outlines', 'mysteries', 'key_npcs', 'locations']: | |
| # Split by pipe and clean up each item | |
| parsed[key] = [item.strip() for item in value.split('|') if item.strip()] | |
| else: | |
| parsed[key] = value | |
| return parsed | |
| def auto_generate_next_session(self, campaign_id: str) -> dict: | |
| """ | |
| Autonomously generate the next session based on campaign progress. | |
| This is a LOW-RISK autonomous feature that: | |
| - Analyzes campaign state and previous sessions | |
| - Generates structured session content | |
| - Does NOT modify the campaign directly | |
| - Returns session data for user review/approval | |
| Args: | |
| campaign_id: Campaign identifier | |
| Returns: | |
| dict with session details (title, opening, encounters, npcs, etc.) | |
| """ | |
| campaign = self.load_campaign(campaign_id) | |
| if not campaign: | |
| return {"error": "Campaign not found"} | |
| # Get session context | |
| events = self.get_campaign_events(campaign_id) | |
| session_events = [e for e in events if e.type == EventType.SESSION] | |
| current_session_num = len(session_events) + 1 | |
| last_session = session_events[-1] if session_events else None | |
| # Get session notes from previous sessions (last 5 sessions for better context) | |
| all_session_notes = self.get_session_notes(campaign_id) | |
| recent_notes = all_session_notes[:5] if all_session_notes else [] | |
| # Build session notes context | |
| notes_context = "" | |
| continuity_requirements = [] | |
| if recent_notes: | |
| notes_context = "\n**PREVIOUS SESSION NOTES:**\n\n" | |
| for note in reversed(recent_notes): # Chronological order | |
| notes_context += f"**Session {note.session_number} Notes:**\n" | |
| # Limit each session's notes to 2000 characters | |
| truncated_notes = note.notes[:2000] | |
| if len(note.notes) > 2000: | |
| truncated_notes += "\n... (notes truncated)" | |
| notes_context += f"{truncated_notes}\n\n" | |
| # Extract continuity requirements from most recent session | |
| if recent_notes: | |
| last_note = recent_notes[0] # Most recent | |
| continuity_requirements = self._extract_continuity_points(last_note.notes) | |
| # Build last session context | |
| if not notes_context: | |
| if last_session: | |
| last_session_info = f"**Last Session Summary:**\n{last_session.description}" | |
| else: | |
| last_session_info = "**Last Session Summary:**\nThis is the first session - use the campaign first session opening from the notes." | |
| else: | |
| last_session_info = notes_context | |
| # Build continuity section | |
| continuity_section = "" | |
| if continuity_requirements: | |
| continuity_section = "\n**🎯 CONTINUITY REQUIREMENTS (You MUST address these):**\n" | |
| for i, req in enumerate(continuity_requirements, 1): | |
| continuity_section += f"{i}. {req}\n" | |
| continuity_section += "\n**MANDATORY:** Reference AT LEAST 2-3 of the above elements in Session {current_session_num}.\n" | |
| # Build context for AI | |
| prompt = f"""You are an expert Dungeon Master planning Session {current_session_num} of "{campaign.name}". | |
| **CAMPAIGN OVERVIEW:** | |
| • **Theme:** {campaign.theme} | |
| • **Setting:** {campaign.setting} | |
| • **Current Story Arc:** {campaign.current_arc} | |
| • **Party Size:** {campaign.party_size} adventurers | |
| • **Main Conflict:** {campaign.main_conflict} | |
| • **Key Factions:** {', '.join(campaign.key_factions) if campaign.key_factions else 'None established yet'} | |
| • **Major Villains:** {', '.join(campaign.major_villains) if campaign.major_villains else 'None revealed yet'} | |
| {last_session_info} | |
| **Campaign DM Notes:** | |
| {campaign.notes[:1000] if campaign.notes else 'No additional notes'} | |
| {continuity_section} | |
| --- | |
| **YOUR MISSION:** Create Session {current_session_num} that feels like a natural, organic continuation of the story. | |
| **CORE PRINCIPLES:** | |
| 1. **Cause and Effect:** Player choices from previous sessions MUST have visible consequences | |
| 2. **NPC Memory:** NPCs remember what players did - allies help, enemies retaliate, neutrals react | |
| 3. **Narrative Threads:** Pick up unresolved hooks from previous sessions - don't let them disappear | |
| 4. **Callbacks:** Reference earlier events to show the campaign has continuity and memory | |
| 5. **Player Agency:** Respond to HOW players solved problems, not just THAT they solved them | |
| 6. **Tone Consistency:** Match the established campaign tone - don't suddenly shift genre | |
| 7. **Escalation:** Stakes should feel appropriate to campaign progression | |
| **SESSION DESIGN CHECKLIST:** | |
| ✓ Does this session respond to player choices from last time? | |
| ✓ Have I included at least one recurring NPC? | |
| ✓ Do consequences feel earned and logical? | |
| ✓ Will players recognize this flows from their actions? | |
| ✓ Have I advanced an active plot thread? | |
| Include: | |
| 1. **SESSION_TITLE:** A compelling title for this session (3-6 words) | |
| 2. **OPENING_SCENE:** The opening narration/scene (2-3 paragraphs) | |
| 3. **KEY_ENCOUNTERS:** 2-4 encounters (combat, social, exploration) separated by pipes (|) | |
| 4. **NPCS_FEATURED:** NPCs that appear in this session, separated by pipes (|) | |
| 5. **LOCATIONS:** Locations visited in this session, separated by pipes (|) | |
| 6. **PLOT_DEVELOPMENTS:** Major plot points that advance this session, separated by pipes (|) | |
| 7. **POTENTIAL_OUTCOMES:** 2-3 possible ways the session could end, separated by pipes (|) | |
| 8. **REWARDS:** Treasure, XP, or story rewards, separated by pipes (|) | |
| 9. **CLIFFHANGER:** Optional cliffhanger for next session (1-2 sentences) | |
| Format your response EXACTLY as: | |
| --- | |
| SESSION_TITLE: [title] | |
| OPENING_SCENE: [opening narration in 2-3 paragraphs] | |
| KEY_ENCOUNTERS: [encounter 1 description | encounter 2 description | encounter 3 description] | |
| NPCS_FEATURED: [NPC name and role | NPC2 name and role | NPC3 name and role] | |
| LOCATIONS: [location 1 | location 2 | location 3] | |
| PLOT_DEVELOPMENTS: [development 1 | development 2 | development 3] | |
| POTENTIAL_OUTCOMES: [outcome 1 | outcome 2 | outcome 3] | |
| REWARDS: [reward 1 | reward 2 | reward 3] | |
| CLIFFHANGER: [cliffhanger sentence] | |
| --- | |
| """ | |
| # Generate session content | |
| ai_response = self.ai_client.generate_creative(prompt) | |
| # Parse the response | |
| session_data = self._parse_session_generation(ai_response) | |
| # Add metadata | |
| session_data['session_number'] = current_session_num | |
| session_data['campaign_id'] = campaign_id | |
| session_data['generated_at'] = datetime.now().isoformat() | |
| session_data['auto_generated'] = True | |
| session_data['used_session_notes'] = len(recent_notes) > 0 | |
| session_data['notes_count'] = len(recent_notes) | |
| return session_data | |
| def _extract_continuity_points(self, session_notes: str) -> list[str]: | |
| """ | |
| Extract key continuity points from session notes that should be | |
| referenced in the next session. | |
| Returns: | |
| List of continuity requirements | |
| """ | |
| continuity = [] | |
| # Extract NPCs mentioned (simple pattern matching) | |
| import re | |
| # Look for proper nouns (capitalized words that appear multiple times) | |
| words = session_notes.split() | |
| capitalized = [w.strip('.,!?:;') for w in words if w and w[0].isupper() and len(w) > 2] | |
| npc_candidates = [w for w in capitalized if capitalized.count(w) >= 2] | |
| if npc_candidates: | |
| unique_npcs = list(set(npc_candidates))[:5] # Top 5 | |
| continuity.append(f"NPCs to remember: {', '.join(unique_npcs)}") | |
| # Look for unresolved elements (keywords indicating future hooks) | |
| unresolved_keywords = ['next time', 'later', 'tomorrow', 'unfinished', 'interrupted', | |
| 'escaped', 'fled', 'promised', 'agreed to', 'will return', | |
| 'warned', 'threatened', 'mysterious'] | |
| unresolved_sentences = [] | |
| for sentence in session_notes.split('.'): | |
| if any(keyword in sentence.lower() for keyword in unresolved_keywords): | |
| unresolved_sentences.append(sentence.strip()) | |
| if unresolved_sentences: | |
| continuity.append(f"Unresolved hooks: {' | '.join(unresolved_sentences[:3])}") | |
| # Look for locations mentioned | |
| location_keywords = ['traveled to', 'arrived at', 'went to', 'heading to', 'in the', 'at the'] | |
| locations = [] | |
| for sentence in session_notes.split('.'): | |
| for keyword in location_keywords: | |
| if keyword in sentence.lower(): | |
| locations.append(sentence.strip()) | |
| break | |
| if locations: | |
| continuity.append(f"Recent locations: {' | '.join(locations[:3])}") | |
| # Look for player decisions or consequences | |
| decision_keywords = ['decided to', 'chose to', 'agreed to', 'refused to', 'killed', 'saved', 'helped'] | |
| decisions = [] | |
| for sentence in session_notes.split('.'): | |
| if any(keyword in sentence.lower() for keyword in decision_keywords): | |
| decisions.append(sentence.strip()) | |
| if decisions: | |
| continuity.append(f"Player decisions needing consequences: {' | '.join(decisions[:3])}") | |
| return continuity | |
| def _parse_session_generation(self, ai_response: str) -> dict: | |
| """Parse AI response for session generation""" | |
| parsed = {} | |
| # Extract content between --- markers | |
| if '---' in ai_response: | |
| parts = ai_response.split('---') | |
| content = parts[1] if len(parts) >= 2 else ai_response | |
| else: | |
| content = ai_response | |
| # Parse each field | |
| lines = content.strip().split('\n') | |
| for line in lines: | |
| if ':' in line: | |
| key, value = line.split(':', 1) | |
| key = key.strip().lower().replace(' ', '_') | |
| value = value.strip() | |
| # Handle pipe-separated fields | |
| if key in ['key_encounters', 'npcs_featured', 'locations', | |
| 'plot_developments', 'potential_outcomes', 'rewards']: | |
| parsed[key] = [item.strip() for item in value.split('|') if item.strip()] | |
| else: | |
| parsed[key] = value | |
| return parsed | |
| def save_session_notes( | |
| self, | |
| campaign_id: str, | |
| session_number: int, | |
| notes: str, | |
| file_name: Optional[str] = None, | |
| file_type: Optional[str] = None | |
| ) -> SessionNotes: | |
| """ | |
| Save session notes for a campaign. | |
| Args: | |
| campaign_id: Campaign identifier | |
| session_number: Session number | |
| notes: Freeform session notes content | |
| file_name: Optional original filename if uploaded | |
| file_type: Optional file extension (.txt, .md, .docx, .pdf) | |
| Returns: | |
| SessionNotes object | |
| """ | |
| # Create session notes object | |
| session_notes = SessionNotes( | |
| id=f"{campaign_id}-session-{session_number}", | |
| campaign_id=campaign_id, | |
| session_number=session_number, | |
| notes=notes, | |
| uploaded_at=datetime.now(), | |
| file_name=file_name, | |
| file_type=file_type | |
| ) | |
| # Save to database (upsert - replace if exists) | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO session_notes | |
| (id, campaign_id, session_number, notes, uploaded_at, file_name, file_type) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| session_notes.id, | |
| session_notes.campaign_id, | |
| session_notes.session_number, | |
| session_notes.notes, | |
| session_notes.uploaded_at.isoformat(), | |
| session_notes.file_name, | |
| session_notes.file_type | |
| )) | |
| conn.commit() | |
| conn.close() | |
| return session_notes | |
| def get_session_notes( | |
| self, | |
| campaign_id: str, | |
| session_number: Optional[int] = None | |
| ) -> List[SessionNotes]: | |
| """ | |
| Get session notes for a campaign. | |
| Args: | |
| campaign_id: Campaign identifier | |
| session_number: Optional specific session number (if None, returns all) | |
| Returns: | |
| List of SessionNotes objects (ordered by session number DESC) | |
| """ | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| if session_number is not None: | |
| cursor.execute(""" | |
| SELECT id, campaign_id, session_number, notes, uploaded_at, file_name, file_type | |
| FROM session_notes | |
| WHERE campaign_id = ? AND session_number = ? | |
| """, (campaign_id, session_number)) | |
| else: | |
| cursor.execute(""" | |
| SELECT id, campaign_id, session_number, notes, uploaded_at, file_name, file_type | |
| FROM session_notes | |
| WHERE campaign_id = ? | |
| ORDER BY session_number DESC | |
| """, (campaign_id,)) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| # Convert to SessionNotes objects | |
| notes_list = [] | |
| for row in rows: | |
| notes_list.append(SessionNotes( | |
| id=row[0], | |
| campaign_id=row[1], | |
| session_number=row[2], | |
| notes=row[3], | |
| uploaded_at=datetime.fromisoformat(row[4]), | |
| file_name=row[5], | |
| file_type=row[6] | |
| )) | |
| return notes_list | |
| def delete_session_notes(self, campaign_id: str, session_number: int) -> bool: | |
| """ | |
| Delete session notes for a specific session. | |
| Args: | |
| campaign_id: Campaign identifier | |
| session_number: Session number | |
| Returns: | |
| True if deleted, False if not found | |
| """ | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| DELETE FROM session_notes | |
| WHERE campaign_id = ? AND session_number = ? | |
| """, (campaign_id, session_number)) | |
| deleted = cursor.rowcount > 0 | |
| conn.commit() | |
| conn.close() | |
| return deleted | |