Spaces:
Paused
Paused
| from fastapi import FastAPI, UploadFile, File, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import StreamingResponse, JSONResponse, FileResponse | |
| import io | |
| import os | |
| import logging | |
| from pathlib import Path | |
| # Import our modules | |
| from app.agent import process_text, clear_memory | |
| from app.speech_to_text import transcribe_audio | |
| from app.text_to_speech import synthesize_speech | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Create the FastAPI app | |
| app = FastAPI() | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Create static directory if it doesn't exist | |
| static_dir = Path("static") | |
| static_dir.mkdir(exist_ok=True) | |
| # Mount static files if directory exists | |
| if static_dir.exists(): | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Routes for the API | |
| async def root(): | |
| """Serve the main UI if available.""" | |
| index_path = static_dir / "index.html" | |
| if index_path.exists(): | |
| return FileResponse(index_path) | |
| return {"message": "AGI Telecom POC API - UI not available"} | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return {"status": "ok"} | |
| async def transcribe(file: UploadFile = File(...)): | |
| """ | |
| Transcribe audio to text. | |
| Args: | |
| file: Audio file upload | |
| Returns: | |
| JSON with transcription | |
| """ | |
| try: | |
| audio_bytes = await file.read() | |
| text = transcribe_audio(audio_bytes) | |
| logger.info(f"Transcribed: {text[:30]}...") | |
| return {"transcription": text} | |
| except Exception as e: | |
| logger.error(f"Transcription error: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to transcribe audio: {str(e)}"} | |
| ) | |
| async def query_agent(request: Request): | |
| """ | |
| Process a text query with the agent. | |
| Args: | |
| request: Request with input_text in JSON body | |
| Returns: | |
| JSON with agent response | |
| """ | |
| try: | |
| data = await request.json() | |
| input_text = data.get("input_text", "") | |
| if not input_text: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "No input_text provided"} | |
| ) | |
| response = process_text(input_text) | |
| logger.info(f"Query: {input_text[:30]}... Response: {response[:30]}...") | |
| return {"response": response} | |
| except Exception as e: | |
| logger.error(f"Query error: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to process query: {str(e)}"} | |
| ) | |
| async def speak(text: str): | |
| """ | |
| Convert text to speech. | |
| Args: | |
| text: Text to convert to speech | |
| Returns: | |
| Audio stream | |
| """ | |
| try: | |
| audio = synthesize_speech(text) | |
| return StreamingResponse(io.BytesIO(audio), media_type="audio/mpeg") | |
| except Exception as e: | |
| logger.error(f"Speech synthesis error: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to synthesize speech: {str(e)}"} | |
| ) | |
| async def reset_memory(): | |
| """Clear the conversation memory.""" | |
| success = clear_memory() | |
| return {"success": success} | |
| async def complete_flow(request: Request): | |
| """ | |
| Complete flow: audio to text to agent to speech. | |
| Args: | |
| request: Request with audio_base64 or text_input in JSON body | |
| Returns: | |
| JSON with results and audio URL | |
| """ | |
| try: | |
| data = await request.json() | |
| text_input = data.get("text_input") | |
| # Process with agent | |
| if not text_input: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "No input provided"} | |
| ) | |
| response = process_text(text_input) | |
| logger.info(f"Agent response: {response[:30]}...") | |
| # Return the response | |
| return { | |
| "input": text_input, | |
| "response": response | |
| } | |
| except Exception as e: | |
| logger.error(f"Complete flow error: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to process: {str(e)}"} | |
| ) | |
| # Run the app | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Check if running on HF Spaces | |
| if os.environ.get("SPACE_ID"): | |
| # Running on HF Spaces - use their port | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |
| else: | |
| # Running locally | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |