DnD_Campaign_Manager / src /agents /campaign_agent.py
official.ghost.logic
Phase 1: Improve session generation continuity
a79cfc6
"""
Campaign Agent for managing D&D campaigns
"""
import os
import sqlite3
import json
from datetime import datetime
from typing import Optional, List
from src.models.campaign import Campaign, CampaignEvent, CampaignTheme, EventType
from src.models.character import Character
from src.models.session_notes import SessionNotes
from src.utils.ai_client import get_ai_client
class CampaignAgent:
"""Agent for managing D&D campaigns"""
def __init__(self, db_path: str = "data/campaigns.db"):
"""Initialize campaign agent"""
self.db_path = db_path
self.ai_client = get_ai_client()
self._ensure_database()
def _ensure_database(self):
"""Ensure database and tables exist"""
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Campaigns table
cursor.execute("""
CREATE TABLE IF NOT EXISTS campaigns (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
theme TEXT NOT NULL,
setting TEXT NOT NULL,
world_name TEXT,
starting_location TEXT,
summary TEXT NOT NULL,
current_arc TEXT,
level_range TEXT,
main_conflict TEXT NOT NULL,
key_factions TEXT,
major_villains TEXT,
central_mysteries TEXT,
character_ids TEXT,
party_size INTEGER,
current_session INTEGER,
total_sessions INTEGER,
game_master TEXT,
is_active BOOLEAN,
homebrew_rules TEXT,
notes TEXT,
created_at TEXT,
updated_at TEXT,
last_session_date TEXT
)
""")
# Campaign events table
cursor.execute("""
CREATE TABLE IF NOT EXISTS campaign_events (
id TEXT PRIMARY KEY,
campaign_id TEXT NOT NULL,
session_number INTEGER NOT NULL,
event_type TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT NOT NULL,
characters_involved TEXT,
npcs_involved TEXT,
locations TEXT,
consequences TEXT,
items_gained TEXT,
items_lost TEXT,
experience_awarded INTEGER,
timestamp TEXT,
importance INTEGER,
tags TEXT,
gm_notes TEXT,
player_visible BOOLEAN,
FOREIGN KEY (campaign_id) REFERENCES campaigns(id)
)
""")
# Session notes table
cursor.execute("""
CREATE TABLE IF NOT EXISTS session_notes (
id TEXT PRIMARY KEY,
campaign_id TEXT NOT NULL,
session_number INTEGER NOT NULL,
notes TEXT NOT NULL,
uploaded_at TEXT NOT NULL,
file_name TEXT,
file_type TEXT,
FOREIGN KEY (campaign_id) REFERENCES campaigns(id),
UNIQUE(campaign_id, session_number)
)
""")
conn.commit()
conn.close()
def create_campaign(
self,
name: str,
theme: str,
setting: str,
summary: str,
main_conflict: str,
game_master: str = "",
world_name: str = "",
starting_location: str = "",
level_range: str = "1-5",
party_size: int = 4
) -> Campaign:
"""Create a new campaign"""
# Generate campaign ID
campaign_id = name.lower().replace(" ", "-").replace("'", "")
# Create campaign
campaign = Campaign(
id=campaign_id,
name=name,
theme=CampaignTheme(theme),
setting=setting,
world_name=world_name,
starting_location=starting_location,
summary=summary,
main_conflict=main_conflict,
level_range=level_range,
party_size=party_size,
game_master=game_master,
current_session=1,
total_sessions=0,
is_active=True
)
# Save to database
self.save_campaign(campaign)
return campaign
def save_campaign(self, campaign: Campaign):
"""Save campaign to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO campaigns (
id, name, theme, setting, world_name, starting_location,
summary, current_arc, level_range, main_conflict,
key_factions, major_villains, central_mysteries,
character_ids, party_size, current_session, total_sessions,
game_master, is_active, homebrew_rules, notes,
created_at, updated_at, last_session_date
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
campaign.id,
campaign.name,
campaign.theme.value,
campaign.setting,
campaign.world_name,
campaign.starting_location,
campaign.summary,
campaign.current_arc,
campaign.level_range,
campaign.main_conflict,
json.dumps(campaign.key_factions),
json.dumps(campaign.major_villains),
json.dumps(campaign.central_mysteries),
json.dumps(campaign.character_ids),
campaign.party_size,
campaign.current_session,
campaign.total_sessions,
campaign.game_master,
campaign.is_active,
json.dumps(campaign.homebrew_rules),
campaign.notes,
campaign.created_at.isoformat() if isinstance(campaign.created_at, datetime) else campaign.created_at,
campaign.updated_at.isoformat() if isinstance(campaign.updated_at, datetime) else campaign.updated_at,
campaign.last_session_date.isoformat() if campaign.last_session_date else None
))
conn.commit()
conn.close()
def load_campaign(self, campaign_id: str) -> Optional[Campaign]:
"""Load campaign from database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM campaigns WHERE id = ?", (campaign_id,))
row = cursor.fetchone()
conn.close()
if not row:
return None
# Convert row to dict
columns = [
'id', 'name', 'theme', 'setting', 'world_name', 'starting_location',
'summary', 'current_arc', 'level_range', 'main_conflict',
'key_factions', 'major_villains', 'central_mysteries',
'character_ids', 'party_size', 'current_session', 'total_sessions',
'game_master', 'is_active', 'homebrew_rules', 'notes',
'created_at', 'updated_at', 'last_session_date'
]
data = {}
for i, col in enumerate(columns):
value = row[i]
# Parse JSON fields
if col in ['key_factions', 'major_villains', 'central_mysteries', 'character_ids', 'homebrew_rules']:
data[col] = json.loads(value) if value else []
else:
data[col] = value
return Campaign(**data)
def list_campaigns(self, active_only: bool = False) -> List[Campaign]:
"""List all campaigns"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if active_only:
cursor.execute("SELECT id FROM campaigns WHERE is_active = 1")
else:
cursor.execute("SELECT id FROM campaigns")
campaign_ids = [row[0] for row in cursor.fetchall()]
conn.close()
return [self.load_campaign(cid) for cid in campaign_ids]
def delete_campaign(self, campaign_id: str) -> bool:
"""Delete a campaign and all associated data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Delete session notes first
cursor.execute("DELETE FROM session_notes WHERE campaign_id = ?", (campaign_id,))
# Delete events
cursor.execute("DELETE FROM campaign_events WHERE campaign_id = ?", (campaign_id,))
# Delete campaign
cursor.execute("DELETE FROM campaigns WHERE id = ?", (campaign_id,))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
return deleted
def add_character_to_campaign(self, campaign_id: str, character_id: str) -> bool:
"""Add a character to a campaign"""
campaign = self.load_campaign(campaign_id)
if not campaign:
return False
campaign.add_character(character_id)
self.save_campaign(campaign)
return True
def remove_character_from_campaign(self, campaign_id: str, character_id: str) -> bool:
"""Remove a character from a campaign"""
campaign = self.load_campaign(campaign_id)
if not campaign:
return False
campaign.remove_character(character_id)
self.save_campaign(campaign)
return True
def start_new_session(self, campaign_id: str) -> bool:
"""Start a new session for a campaign"""
campaign = self.load_campaign(campaign_id)
if not campaign:
return False
campaign.start_new_session()
self.save_campaign(campaign)
return True
def add_event(
self,
campaign_id: str,
event_type: str,
title: str,
description: str,
session_number: Optional[int] = None,
characters_involved: Optional[List[str]] = None,
npcs_involved: Optional[List[str]] = None,
locations: Optional[List[str]] = None,
importance: int = 3
) -> Optional[CampaignEvent]:
"""Add an event to a campaign"""
campaign = self.load_campaign(campaign_id)
if not campaign:
return None
# Use current session if not specified
if session_number is None:
session_number = campaign.current_session
# Create event
event_id = f"{campaign_id}-event-{datetime.now().timestamp()}"
event = CampaignEvent(
id=event_id,
campaign_id=campaign_id,
session_number=session_number,
event_type=EventType(event_type),
title=title,
description=description,
characters_involved=characters_involved or [],
npcs_involved=npcs_involved or [],
locations=locations or [],
importance=importance
)
# Save to database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO campaign_events (
id, campaign_id, session_number, event_type, title, description,
characters_involved, npcs_involved, locations, consequences,
items_gained, items_lost, experience_awarded, timestamp,
importance, tags, gm_notes, player_visible
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
event.id,
event.campaign_id,
event.session_number,
event.event_type.value,
event.title,
event.description,
json.dumps(event.characters_involved),
json.dumps(event.npcs_involved),
json.dumps(event.locations),
json.dumps(event.consequences),
json.dumps(event.items_gained),
json.dumps(event.items_lost),
event.experience_awarded,
event.timestamp.isoformat(),
event.importance,
json.dumps(event.tags),
event.gm_notes,
event.player_visible
))
conn.commit()
conn.close()
# Update campaign memory
campaign.add_event(event)
self.save_campaign(campaign)
return event
def get_campaign_events(self, campaign_id: str, session_number: Optional[int] = None) -> List[CampaignEvent]:
"""Get events for a campaign"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if session_number:
cursor.execute(
"SELECT * FROM campaign_events WHERE campaign_id = ? AND session_number = ? ORDER BY timestamp",
(campaign_id, session_number)
)
else:
cursor.execute(
"SELECT * FROM campaign_events WHERE campaign_id = ? ORDER BY timestamp",
(campaign_id,)
)
rows = cursor.fetchall()
conn.close()
events = []
for row in rows:
event_data = {
'id': row[0],
'campaign_id': row[1],
'session_number': row[2],
'event_type': row[3],
'title': row[4],
'description': row[5],
'characters_involved': json.loads(row[6]) if row[6] else [],
'npcs_involved': json.loads(row[7]) if row[7] else [],
'locations': json.loads(row[8]) if row[8] else [],
'consequences': json.loads(row[9]) if row[9] else [],
'items_gained': json.loads(row[10]) if row[10] else [],
'items_lost': json.loads(row[11]) if row[11] else [],
'experience_awarded': row[12],
'timestamp': row[13],
'importance': row[14],
'tags': json.loads(row[15]) if row[15] else [],
'gm_notes': row[16],
'player_visible': row[17]
}
events.append(CampaignEvent(**event_data))
return events
def export_campaign_summary(self, campaign_id: str) -> str:
"""Export campaign summary as markdown"""
campaign = self.load_campaign(campaign_id)
if not campaign:
return "Campaign not found"
return campaign.to_markdown()
def synthesize_campaign_from_characters(
self,
characters: List[Character],
game_master: str = "",
additional_notes: str = ""
) -> Campaign:
"""
Synthesize a campaign tailored to the provided characters using AI.
Analyzes the party composition, backstories, alignments, and creates
a custom campaign that fits the characters.
"""
# Build character analysis
party_analysis = []
for char in characters:
party_analysis.append(f"""
- **{char.name}** (Level {char.level} {char.race.value} {char.character_class.value})
- Alignment: {char.alignment.value}
- Background: {char.background.background_type}
- Backstory: {char.background.backstory[:200]}...
- Personality: {char.background.personality_traits[:150]}...""")
party_summary = "\n".join(party_analysis)
# Calculate party level range
levels = [char.level for char in characters]
min_level = min(levels)
max_level = max(levels)
avg_level = sum(levels) // len(levels)
level_range = f"{min_level}-{max_level}" if min_level != max_level else f"{min_level}"
# Determine appropriate challenge level
if avg_level <= 3:
tier = "Tier 1 (Local Heroes)"
scope = "local region or small kingdom"
elif avg_level <= 10:
tier = "Tier 2 (Heroes of the Realm)"
scope = "kingdom or large region"
elif avg_level <= 16:
tier = "Tier 3 (Masters of the Realm)"
scope = "continent or multiple kingdoms"
else:
tier = "Tier 4 (Masters of the World)"
scope = "entire world or planar"
# Create AI prompt for campaign synthesis
prompt = f"""You are an expert Dungeon Master creating a COMPLETE campaign guide for D&D 5e.
**Party Composition ({len(characters)} characters, Level {level_range}):**
{party_summary}
**Campaign Tier:** {tier}
**Appropriate Scope:** {scope}
{f"**Additional DM Notes:** {additional_notes}" if additional_notes else ""}
Create a DETAILED campaign that weaves all these characters together. Include:
1. **Character Connections**: How do these characters know each other or become connected?
2. **Personal Stakes**: What does each character have to lose/gain?
3. **Villain Details**: Full profiles with motivations and how they threaten each PC
4. **World Details**: Specific locations, politics, and cultural elements
5. **Adventure Hooks**: 3-4 specific scenarios that pull characters in
6. **First Session Outline**: Concrete opening scenario
7. **Story Progression**: Where the campaign leads over 5-10 sessions
Generate a comprehensive campaign document with these sections:
**CAMPAIGN_NAME:** (Epic, memorable name - 2-5 words)
**THEME:** (Choose ONE: High Fantasy, Dark Fantasy, Urban Fantasy, Political Intrigue, Horror, Exploration, Dungeon Crawl, or Custom)
**WORLD_NAME:** (Name of the world/realm)
**STARTING_LOCATION:** (Town, city, or region where adventure begins)
**SETTING:** (3-4 sentences about the world, its current political state, and atmosphere)
**SUMMARY:** (4-5 sentences explaining the campaign hook and what drives the story)
**CONFLICT:** (2-3 sentences describing the central threat and why it matters)
**FACTIONS:** (List 3-4 factions with brief descriptions, format: "Faction Name - description")
**VILLAINS:** (2-3 detailed villain profiles with:
- Name and role
- Motivation (what they want and why)
- Connection to party (which PCs they threaten and how)
- Methods (how they operate)
Format: "Villain Name - Role | Motivation | Connections | Methods")
**CHARACTER_CONNECTIONS:** REQUIRED - For EACH party member listed above, explain how they fit (Format: "Name: connection details | Name2: connection details | ...")
**ADVENTURE_HOOKS:** REQUIRED - List 3-4 specific hooks separated by pipes (Format: "Hook 1 details | Hook 2 details | Hook 3 details")
**FIRST_SESSION:** REQUIRED - Detailed opening scene (Format: "Scene description")
**SESSION_OUTLINES:** REQUIRED - Sessions 2-5 separated by pipes (Format: "Session 2 details | Session 3 details | Session 4 details | Session 5 details")
**MYSTERIES:** REQUIRED - 2-3 mysteries separated by pipes (Format: "Mystery 1 | Mystery 2 | Mystery 3")
**KEY_NPCS:** REQUIRED - 3-4 NPCs separated by pipes (Format: "NPC Name - role and connection | NPC2 Name - role and connection | ...")
**LOCATIONS:** REQUIRED - 3-4 locations separated by pipes (Format: "Location Name - description | Location2 Name - description | ...")
**STORY_ARC:** (First major story arc with clear beginning, middle, and climax)
Format your response EXACTLY as follows (DO NOT SKIP ANY FIELDS):
---
CAMPAIGN_NAME: [name]
THEME: [theme]
WORLD_NAME: [world]
STARTING_LOCATION: [location]
SETTING: [setting details]
SUMMARY: [campaign summary]
CONFLICT: [main conflict]
FACTIONS: [faction1 - desc | faction2 - desc | faction3 - desc]
VILLAINS: [villain1 details | villain2 details]
CHARACTER_CONNECTIONS: [char1: how they fit | char2: how they fit | char3: how they fit | char4: how they fit]
ADVENTURE_HOOKS: [hook1 description in 2-3 sentences | hook2 description | hook3 description | hook4 description]
FIRST_SESSION: [opening scene details]
SESSION_OUTLINES: [session 2 outline | session 3 outline | session 4 outline | session 5 outline]
MYSTERIES: [mystery1 details | mystery2 details | mystery3 details]
KEY_NPCS: [NPC Name - their role and connection to party | NPC2 Name - role and connection | NPC3 Name - role | NPC4 Name - role]
LOCATIONS: [Location Name - description of place | Location2 Name - description | Location3 Name - description | Location4 Name - description]
STORY_ARC: [arc details]
---
IMPORTANT: Every field listed above MUST have content. Use pipe separators (|) between items in list fields."""
# Get AI response
response = self.ai_client.generate_creative(prompt)
# Parse the response
parsed = self._parse_campaign_synthesis(response)
# If critical fields are missing, make a second focused call
missing_fields = []
if not parsed.get('character_connections') or len(parsed.get('character_connections', [])) == 0:
missing_fields.append('CHARACTER_CONNECTIONS')
if not parsed.get('adventure_hooks') or len(parsed.get('adventure_hooks', [])) == 0:
missing_fields.append('ADVENTURE_HOOKS')
if not parsed.get('key_npcs') or len(parsed.get('key_npcs', [])) == 0:
missing_fields.append('KEY_NPCS')
if not parsed.get('locations') or len(parsed.get('locations', [])) == 0:
missing_fields.append('LOCATIONS')
if not parsed.get('session_outlines') or len(parsed.get('session_outlines', [])) == 0:
missing_fields.append('SESSION_OUTLINES')
if missing_fields:
# Make focused call for missing fields
fields_list = ' '.join(['**' + field + ':**' for field in missing_fields])
format_template = '\n'.join([field + ': [details | more details | etc]' for field in missing_fields])
followup_prompt = f"""Campaign: {parsed.get('name', 'Campaign')}
Setting: {parsed.get('setting', '')}
Party:
{party_summary}
Generate ONLY these missing campaign elements. Use pipe (|) separators between items:
{fields_list}
Format:
---
{format_template}
---"""
followup_response = self.ai_client.generate_creative(followup_prompt)
followup_parsed = self._parse_campaign_synthesis(followup_response)
# Merge the results
for key in ['character_connections', 'adventure_hooks', 'key_npcs', 'locations', 'session_outlines']:
if key in followup_parsed and followup_parsed[key]:
parsed[key] = followup_parsed[key]
# Create campaign
campaign = self.create_campaign(
name=parsed.get('name', 'Untitled Campaign'),
theme=parsed.get('theme', 'High Fantasy'),
setting=parsed.get('setting', 'A fantasy realm awaits...'),
summary=parsed.get('summary', 'An epic adventure begins...'),
main_conflict=parsed.get('conflict', 'Evil threatens the land...'),
game_master=game_master,
world_name=parsed.get('world_name', 'The Realm'),
starting_location=parsed.get('starting_location', 'The Crossroads'),
level_range=level_range,
party_size=len(characters)
)
# Load the created campaign and add additional details
campaign = self.load_campaign(campaign.id)
# Add factions, villains, and mysteries
if 'factions' in parsed:
campaign.key_factions = parsed['factions']
if 'villains' in parsed:
campaign.major_villains = parsed['villains']
if 'mysteries' in parsed:
campaign.central_mysteries = parsed['mysteries']
if 'story_arc' in parsed:
campaign.current_arc = parsed['story_arc']
# Add detailed campaign notes with all the extra information
campaign_notes = self._build_campaign_notes(parsed)
campaign.notes = campaign_notes
# Add all characters to the campaign
for char in characters:
campaign.add_character(char.id)
# Save updated campaign
self.save_campaign(campaign)
return campaign
def _build_campaign_notes(self, parsed: dict) -> str:
"""Build detailed campaign notes from parsed data"""
notes = []
if 'character_connections' in parsed:
notes.append("## Character Connections\n")
for conn in parsed['character_connections']:
notes.append(f"- {conn}\n")
notes.append("\n")
if 'adventure_hooks' in parsed:
notes.append("## Adventure Hooks\n")
for i, hook in enumerate(parsed['adventure_hooks'], 1):
notes.append(f"{i}. {hook}\n")
notes.append("\n")
if 'first_session' in parsed:
notes.append("## First Session Opening\n")
notes.append(f"{parsed['first_session']}\n\n")
if 'session_outlines' in parsed:
notes.append("## Session Progression\n")
for i, outline in enumerate(parsed['session_outlines'], 2):
notes.append(f"**Session {i}:** {outline}\n")
notes.append("\n")
if 'key_npcs' in parsed:
notes.append("## Key NPCs\n")
for npc in parsed['key_npcs']:
notes.append(f"- {npc}\n")
notes.append("\n")
if 'locations' in parsed:
notes.append("## Key Locations\n")
for loc in parsed['locations']:
notes.append(f"- {loc}\n")
notes.append("\n")
return "".join(notes)
def _parse_campaign_synthesis(self, ai_response: str) -> dict:
"""Parse AI response for campaign synthesis"""
parsed = {}
# Extract content between --- markers
if '---' in ai_response:
parts = ai_response.split('---')
if len(parts) >= 2:
content = parts[1]
else:
content = ai_response
else:
content = ai_response
# Parse each field
lines = content.strip().split('\n')
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
key = key.strip().lower().replace('campaign_', '').replace('_', '_')
value = value.strip()
# Handle pipe-separated fields (detailed lists)
if key in ['factions', 'villains', 'character_connections', 'adventure_hooks',
'session_outlines', 'mysteries', 'key_npcs', 'locations']:
# Split by pipe and clean up each item
parsed[key] = [item.strip() for item in value.split('|') if item.strip()]
else:
parsed[key] = value
return parsed
def auto_generate_next_session(self, campaign_id: str) -> dict:
"""
Autonomously generate the next session based on campaign progress.
This is a LOW-RISK autonomous feature that:
- Analyzes campaign state and previous sessions
- Generates structured session content
- Does NOT modify the campaign directly
- Returns session data for user review/approval
Args:
campaign_id: Campaign identifier
Returns:
dict with session details (title, opening, encounters, npcs, etc.)
"""
campaign = self.load_campaign(campaign_id)
if not campaign:
return {"error": "Campaign not found"}
# Get session context
events = self.get_campaign_events(campaign_id)
session_events = [e for e in events if e.type == EventType.SESSION]
current_session_num = len(session_events) + 1
last_session = session_events[-1] if session_events else None
# Get session notes from previous sessions (last 5 sessions for better context)
all_session_notes = self.get_session_notes(campaign_id)
recent_notes = all_session_notes[:5] if all_session_notes else []
# Build session notes context
notes_context = ""
continuity_requirements = []
if recent_notes:
notes_context = "\n**PREVIOUS SESSION NOTES:**\n\n"
for note in reversed(recent_notes): # Chronological order
notes_context += f"**Session {note.session_number} Notes:**\n"
# Limit each session's notes to 2000 characters
truncated_notes = note.notes[:2000]
if len(note.notes) > 2000:
truncated_notes += "\n... (notes truncated)"
notes_context += f"{truncated_notes}\n\n"
# Extract continuity requirements from most recent session
if recent_notes:
last_note = recent_notes[0] # Most recent
continuity_requirements = self._extract_continuity_points(last_note.notes)
# Build last session context
if not notes_context:
if last_session:
last_session_info = f"**Last Session Summary:**\n{last_session.description}"
else:
last_session_info = "**Last Session Summary:**\nThis is the first session - use the campaign first session opening from the notes."
else:
last_session_info = notes_context
# Build continuity section
continuity_section = ""
if continuity_requirements:
continuity_section = "\n**🎯 CONTINUITY REQUIREMENTS (You MUST address these):**\n"
for i, req in enumerate(continuity_requirements, 1):
continuity_section += f"{i}. {req}\n"
continuity_section += "\n**MANDATORY:** Reference AT LEAST 2-3 of the above elements in Session {current_session_num}.\n"
# Build context for AI
prompt = f"""You are an expert Dungeon Master planning Session {current_session_num} of "{campaign.name}".
**CAMPAIGN OVERVIEW:**
• **Theme:** {campaign.theme}
• **Setting:** {campaign.setting}
• **Current Story Arc:** {campaign.current_arc}
• **Party Size:** {campaign.party_size} adventurers
• **Main Conflict:** {campaign.main_conflict}
• **Key Factions:** {', '.join(campaign.key_factions) if campaign.key_factions else 'None established yet'}
• **Major Villains:** {', '.join(campaign.major_villains) if campaign.major_villains else 'None revealed yet'}
{last_session_info}
**Campaign DM Notes:**
{campaign.notes[:1000] if campaign.notes else 'No additional notes'}
{continuity_section}
---
**YOUR MISSION:** Create Session {current_session_num} that feels like a natural, organic continuation of the story.
**CORE PRINCIPLES:**
1. **Cause and Effect:** Player choices from previous sessions MUST have visible consequences
2. **NPC Memory:** NPCs remember what players did - allies help, enemies retaliate, neutrals react
3. **Narrative Threads:** Pick up unresolved hooks from previous sessions - don't let them disappear
4. **Callbacks:** Reference earlier events to show the campaign has continuity and memory
5. **Player Agency:** Respond to HOW players solved problems, not just THAT they solved them
6. **Tone Consistency:** Match the established campaign tone - don't suddenly shift genre
7. **Escalation:** Stakes should feel appropriate to campaign progression
**SESSION DESIGN CHECKLIST:**
✓ Does this session respond to player choices from last time?
✓ Have I included at least one recurring NPC?
✓ Do consequences feel earned and logical?
✓ Will players recognize this flows from their actions?
✓ Have I advanced an active plot thread?
Include:
1. **SESSION_TITLE:** A compelling title for this session (3-6 words)
2. **OPENING_SCENE:** The opening narration/scene (2-3 paragraphs)
3. **KEY_ENCOUNTERS:** 2-4 encounters (combat, social, exploration) separated by pipes (|)
4. **NPCS_FEATURED:** NPCs that appear in this session, separated by pipes (|)
5. **LOCATIONS:** Locations visited in this session, separated by pipes (|)
6. **PLOT_DEVELOPMENTS:** Major plot points that advance this session, separated by pipes (|)
7. **POTENTIAL_OUTCOMES:** 2-3 possible ways the session could end, separated by pipes (|)
8. **REWARDS:** Treasure, XP, or story rewards, separated by pipes (|)
9. **CLIFFHANGER:** Optional cliffhanger for next session (1-2 sentences)
Format your response EXACTLY as:
---
SESSION_TITLE: [title]
OPENING_SCENE: [opening narration in 2-3 paragraphs]
KEY_ENCOUNTERS: [encounter 1 description | encounter 2 description | encounter 3 description]
NPCS_FEATURED: [NPC name and role | NPC2 name and role | NPC3 name and role]
LOCATIONS: [location 1 | location 2 | location 3]
PLOT_DEVELOPMENTS: [development 1 | development 2 | development 3]
POTENTIAL_OUTCOMES: [outcome 1 | outcome 2 | outcome 3]
REWARDS: [reward 1 | reward 2 | reward 3]
CLIFFHANGER: [cliffhanger sentence]
---
"""
# Generate session content
ai_response = self.ai_client.generate_creative(prompt)
# Parse the response
session_data = self._parse_session_generation(ai_response)
# Add metadata
session_data['session_number'] = current_session_num
session_data['campaign_id'] = campaign_id
session_data['generated_at'] = datetime.now().isoformat()
session_data['auto_generated'] = True
session_data['used_session_notes'] = len(recent_notes) > 0
session_data['notes_count'] = len(recent_notes)
return session_data
def _extract_continuity_points(self, session_notes: str) -> list[str]:
"""
Extract key continuity points from session notes that should be
referenced in the next session.
Returns:
List of continuity requirements
"""
continuity = []
# Extract NPCs mentioned (simple pattern matching)
import re
# Look for proper nouns (capitalized words that appear multiple times)
words = session_notes.split()
capitalized = [w.strip('.,!?:;') for w in words if w and w[0].isupper() and len(w) > 2]
npc_candidates = [w for w in capitalized if capitalized.count(w) >= 2]
if npc_candidates:
unique_npcs = list(set(npc_candidates))[:5] # Top 5
continuity.append(f"NPCs to remember: {', '.join(unique_npcs)}")
# Look for unresolved elements (keywords indicating future hooks)
unresolved_keywords = ['next time', 'later', 'tomorrow', 'unfinished', 'interrupted',
'escaped', 'fled', 'promised', 'agreed to', 'will return',
'warned', 'threatened', 'mysterious']
unresolved_sentences = []
for sentence in session_notes.split('.'):
if any(keyword in sentence.lower() for keyword in unresolved_keywords):
unresolved_sentences.append(sentence.strip())
if unresolved_sentences:
continuity.append(f"Unresolved hooks: {' | '.join(unresolved_sentences[:3])}")
# Look for locations mentioned
location_keywords = ['traveled to', 'arrived at', 'went to', 'heading to', 'in the', 'at the']
locations = []
for sentence in session_notes.split('.'):
for keyword in location_keywords:
if keyword in sentence.lower():
locations.append(sentence.strip())
break
if locations:
continuity.append(f"Recent locations: {' | '.join(locations[:3])}")
# Look for player decisions or consequences
decision_keywords = ['decided to', 'chose to', 'agreed to', 'refused to', 'killed', 'saved', 'helped']
decisions = []
for sentence in session_notes.split('.'):
if any(keyword in sentence.lower() for keyword in decision_keywords):
decisions.append(sentence.strip())
if decisions:
continuity.append(f"Player decisions needing consequences: {' | '.join(decisions[:3])}")
return continuity
def _parse_session_generation(self, ai_response: str) -> dict:
"""Parse AI response for session generation"""
parsed = {}
# Extract content between --- markers
if '---' in ai_response:
parts = ai_response.split('---')
content = parts[1] if len(parts) >= 2 else ai_response
else:
content = ai_response
# Parse each field
lines = content.strip().split('\n')
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
key = key.strip().lower().replace(' ', '_')
value = value.strip()
# Handle pipe-separated fields
if key in ['key_encounters', 'npcs_featured', 'locations',
'plot_developments', 'potential_outcomes', 'rewards']:
parsed[key] = [item.strip() for item in value.split('|') if item.strip()]
else:
parsed[key] = value
return parsed
def save_session_notes(
self,
campaign_id: str,
session_number: int,
notes: str,
file_name: Optional[str] = None,
file_type: Optional[str] = None
) -> SessionNotes:
"""
Save session notes for a campaign.
Args:
campaign_id: Campaign identifier
session_number: Session number
notes: Freeform session notes content
file_name: Optional original filename if uploaded
file_type: Optional file extension (.txt, .md, .docx, .pdf)
Returns:
SessionNotes object
"""
# Create session notes object
session_notes = SessionNotes(
id=f"{campaign_id}-session-{session_number}",
campaign_id=campaign_id,
session_number=session_number,
notes=notes,
uploaded_at=datetime.now(),
file_name=file_name,
file_type=file_type
)
# Save to database (upsert - replace if exists)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO session_notes
(id, campaign_id, session_number, notes, uploaded_at, file_name, file_type)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
session_notes.id,
session_notes.campaign_id,
session_notes.session_number,
session_notes.notes,
session_notes.uploaded_at.isoformat(),
session_notes.file_name,
session_notes.file_type
))
conn.commit()
conn.close()
return session_notes
def get_session_notes(
self,
campaign_id: str,
session_number: Optional[int] = None
) -> List[SessionNotes]:
"""
Get session notes for a campaign.
Args:
campaign_id: Campaign identifier
session_number: Optional specific session number (if None, returns all)
Returns:
List of SessionNotes objects (ordered by session number DESC)
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if session_number is not None:
cursor.execute("""
SELECT id, campaign_id, session_number, notes, uploaded_at, file_name, file_type
FROM session_notes
WHERE campaign_id = ? AND session_number = ?
""", (campaign_id, session_number))
else:
cursor.execute("""
SELECT id, campaign_id, session_number, notes, uploaded_at, file_name, file_type
FROM session_notes
WHERE campaign_id = ?
ORDER BY session_number DESC
""", (campaign_id,))
rows = cursor.fetchall()
conn.close()
# Convert to SessionNotes objects
notes_list = []
for row in rows:
notes_list.append(SessionNotes(
id=row[0],
campaign_id=row[1],
session_number=row[2],
notes=row[3],
uploaded_at=datetime.fromisoformat(row[4]),
file_name=row[5],
file_type=row[6]
))
return notes_list
def delete_session_notes(self, campaign_id: str, session_number: int) -> bool:
"""
Delete session notes for a specific session.
Args:
campaign_id: Campaign identifier
session_number: Session number
Returns:
True if deleted, False if not found
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
DELETE FROM session_notes
WHERE campaign_id = ? AND session_number = ?
""", (campaign_id, session_number))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
return deleted