Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import os | |
| try: | |
| from . import chat, voice, database | |
| except ImportError: | |
| import chat, voice, database | |
| import traceback | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize database on startup | |
| async def startup_event(): | |
| database.init_db() | |
| class ChatRequest(BaseModel): | |
| message: str | |
| # history is now optional/deprecated as we use session_id, but keeping for backward compatibility if needed | |
| history: List[dict] = [] | |
| class SessionCreateRequest(BaseModel): | |
| name: str = "New Chat" | |
| user_id: str | |
| class VoiceRequest(BaseModel): | |
| text: str | |
| voice: str | |
| async def root(): | |
| return {"message": "AI Girlfriend API"} | |
| async def get_sessions(user_id: str): | |
| return database.get_sessions(user_id) | |
| async def create_session(request: SessionCreateRequest): | |
| return database.create_session(request.user_id, request.name) | |
| async def delete_session(session_id: str): | |
| database.delete_session(session_id) | |
| return {"message": "Session deleted"} | |
| async def get_session_messages(session_id: str): | |
| return database.get_messages(session_id) | |
| async def chat_session_endpoint(session_id: str, request: ChatRequest): | |
| try: | |
| # Get existing history from DB | |
| db_history = database.get_messages(session_id) | |
| # Convert to format expected by chat.py (list of dicts with role/content) | |
| history_for_model = [{"role": msg["role"], "content": msg["content"]} for msg in db_history] | |
| # Get response from AI | |
| response_text = chat.get_chat_response(request.message, history_for_model) | |
| # Save user message and AI response to DB | |
| database.add_message(session_id, "user", request.message) | |
| database.add_message(session_id, "assistant", response_text) | |
| # If this was the first message (history was empty before this turn), generate a summary | |
| if not db_history: | |
| try: | |
| summary = chat.generate_summary(request.message, response_text) | |
| database.update_session_name(session_id, summary) | |
| except Exception as e: | |
| print(f"Failed to generate summary: {e}") | |
| return {"response": response_text} | |
| except Exception as e: | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Legacy endpoint - keeping for now or can be removed if frontend is fully updated | |
| async def chat_endpoint(request: ChatRequest): | |
| try: | |
| response = chat.get_chat_response(request.message, request.history) | |
| return {"response": response} | |
| except Exception as e: | |
| traceback.print_exc() # Print error to console | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def voices_endpoint(): | |
| return voice.get_voices() | |
| async def speak_endpoint(request: VoiceRequest): | |
| try: | |
| audio_path = await voice.generate_audio(request.text, request.voice) | |
| return FileResponse(audio_path, media_type="audio/mpeg", filename="response.mp3") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| # Mount static files if they exist (for production/served build) | |
| frontend_dist = os.path.join(os.path.dirname(__file__), "../frontend/dist") | |
| assets_path = os.path.join(frontend_dist, "assets") | |
| if os.path.exists(assets_path): | |
| app.mount("/assets", StaticFiles(directory=assets_path), name="assets") | |
| async def serve_frontend(full_path: str): | |
| # Only serve if dist exists | |
| if os.path.exists(frontend_dist): | |
| # Serve index.html for any other path (SPA routing) | |
| # Check if file exists in dist, else serve index.html | |
| target_file = os.path.join(frontend_dist, full_path) | |
| if full_path and os.path.exists(target_file): | |
| return FileResponse(target_file) | |
| return FileResponse(os.path.join(frontend_dist, "index.html")) | |
| # If dist doesn't exist, just return a message or 404 for frontend routes | |
| # This allows backend to run even if frontend isn't built | |
| if full_path == "": | |
| return {"message": "Backend is running. Frontend build not found. Use 'npm run dev' for frontend development."} | |
| raise HTTPException(status_code=404, detail="Frontend build not found") | |