Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import httpx | |
| from fastapi import FastAPI, Request, Form, HTTPException, APIRouter | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import Optional | |
| from fastapi.responses import JSONResponse | |
| from openai import OpenAI | |
| # Create the FastAPI app | |
| app = FastAPI() | |
| # Add CORS middleware to allow requests from any origin | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Base path for Hugging Face Spaces | |
| BASE_PATH = "/api" if os.getenv('HF_SPACE') else "" | |
| # Root endpoint for health checks | |
| async def root(): | |
| return {"status": "ok", "message": "Server is running"} | |
| # Slack configuration | |
| SLACK_VERIFICATION_TOKEN = os.getenv("SLACK_VERIFICATION_TOKEN") | |
| ALLOWED_SLACK_USER_ID = os.getenv("YOUR_SLACK_USER_ID") | |
| # Load KB | |
| with open("kb.json") as f: | |
| kb = json.load(f) | |
| # Build system prompt | |
| system_prompt = "You are a helpful assistant. Only answer questions based on the following knowledge base:\n\n" | |
| for q, a in kb.items(): | |
| system_prompt += f"Q: {q}\nA: {a}\n\n" | |
| system_prompt += "If the question is not in the knowledge base, respond with: 'I'm not sure about that. Let me connect you with a human agent.'" | |
| # OpenAI setup | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| client.base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") | |
| # Chatwoot config | |
| CHATWOOT_BASE_URL = os.getenv("CHATWOOT_BASE_URL") | |
| CHATWOOT_API_KEY = os.getenv("CHATWOOT_API_KEY") | |
| CHATWOOT_ACCOUNT_ID = int(os.getenv("CHATWOOT_ACCOUNT_ID")) # e.g., 123911 | |
| # Track conversations where AI should stop replying | |
| stop_reply_conversations = set() | |
| async def ask(request: Request): | |
| print("π /ask endpoint was HIT") # <-- Log immediately on hit | |
| try: | |
| payload = await request.json() | |
| print("π₯ Incoming payload:", json.dumps(payload, indent=2)) | |
| except Exception as e: | |
| print("β Failed to parse JSON payload:", e) | |
| return {"status": "error", "detail": "Invalid JSON"} | |
| account_id = payload.get("account", {}).get("id") | |
| conversation_id = str(payload.get("conversation", {}).get("id")) | |
| sender = payload.get("sender") or {} | |
| sender_id = sender.get("id") | |
| sender_role = (sender.get("role") or "").lower() | |
| message_type = payload.get("message_type", "").lower() | |
| message_content = payload.get("content", "").strip() | |
| print(f"π§Ύ sender_id: {sender_id}, sender_role: {sender_role}, account_id: {account_id}") | |
| # Step 1: Detect agent message via Slack and disable AI for that conversation | |
| if message_type != "incoming": | |
| messages = payload.get("conversation", {}).get("messages", []) | |
| if messages: | |
| msg = messages[0] | |
| external_ids = msg.get("external_source_ids", {}) | |
| if "slack" in external_ids: | |
| stop_reply_conversations.add(conversation_id) | |
| print(f"π Human intervened via Slack in conversation {conversation_id}. Disabling AI.") | |
| return {"status": "AI disabled due to Slack intervention"} | |
| print("β οΈ Ignoring non-incoming message") | |
| return {"status": "ignored"} | |
| # Bot must not reply to itself | |
| if sender_id == account_id: | |
| print("β οΈ Ignoring bot's own message") | |
| return {"status": "ignored"} | |
| # Handle special bot resume command | |
| if sender_role == "agent" and message_content.lower() == "#botresume": | |
| stop_reply_conversations.discard(conversation_id) | |
| print(f"βΉοΈ Bot resumed for conversation {conversation_id}") | |
| await send_chatwoot_message(conversation_id, "Bot resumed and will reply to users now.") | |
| return {"status": "bot resumed"} | |
| # Check if AI is blacklisted for this conversation | |
| if conversation_id in stop_reply_conversations: | |
| print(f"π« AI is disabled for conversation {conversation_id}") | |
| return {"status": "ignored: human takeover"} | |
| # Ensure all data is present | |
| if not message_content or not conversation_id: | |
| print("β Missing content or conversation ID") | |
| return {"status": "invalid payload"} | |
| # Build messages for GPT | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": message_content}, | |
| ] | |
| try: | |
| response = client.chat.completions.create( | |
| model="deepseek-ai/DeepSeek-V3", | |
| messages=messages, | |
| temperature=0, | |
| max_tokens=200, | |
| ) | |
| answer = response.choices[0].message.content.strip() | |
| print("β GPT Answer:", answer) | |
| # β Extract and send token usage to Slack with conversation history | |
| usage = response.usage | |
| if usage: | |
| prompt_tokens = usage.prompt_tokens | |
| completion_tokens = usage.completion_tokens | |
| total_tokens = usage.total_tokens | |
| # Get conversation history (last 3 messages) | |
| messages = payload.get('conversation', {}).get('messages', [])[-6:] # Get last 3 exchanges (6 messages) | |
| conversation_history = [] | |
| for msg in messages: | |
| role = "π€ User" if msg.get('message_type') == 'incoming' else "π€ AI" | |
| content = msg.get('content', '').strip() | |
| if content: | |
| conversation_history.append(f"{role}: {content}") | |
| # Add current exchange | |
| conversation_history.append(f"π€ User: {message_content}") | |
| conversation_history.append(f"π€ AI: {answer}") | |
| # Format conversation history with code blocks for better readability | |
| formatted_history = "\n".join(conversation_history) | |
| slack_msg = ( | |
| f"π¬ *Conversation Update* - `{conversation_id}`\n\n" | |
| f"*π Conversation History:*\n```\n{formatted_history}\n```\n\n" | |
| f"*β‘ Token Usage:*\n```\n" | |
| f"Prompt: {prompt_tokens} tokens\n" | |
| f"Completion: {completion_tokens} tokens\n" | |
| f"Total: {total_tokens} tokens\n" | |
| f"Model: {response.model or 'N/A'}\n" | |
| f"```" | |
| ) | |
| await send_to_slack(slack_msg) | |
| else: | |
| print("β οΈ No token usage info returned from API") | |
| except Exception as e: | |
| print("β OpenAI Error:", e) | |
| answer = "Sorry, I'm having trouble answering right now." | |
| if answer == "I'm not sure about that. Let me connect you with a human agent.": | |
| stop_reply_conversations.add(conversation_id) | |
| print(f"π« Fallback answer, disabling AI for conversation {conversation_id}") | |
| await send_chatwoot_message(conversation_id, answer) | |
| return {"status": "ok"} | |
| async def send_to_slack(message: str): | |
| webhook_url = os.getenv("SLACK_WEBHOOK_URL") | |
| if not webhook_url: | |
| print("β SLACK_WEBHOOK_URL not set") | |
| return | |
| payload = {"text": message} | |
| try: | |
| async with httpx.AsyncClient() as http: | |
| resp = await http.post(webhook_url, json=payload) | |
| print("π¨ Slack response:", resp.status_code, resp.text) | |
| except Exception as e: | |
| print("β Slack Send Error:", e) | |
| async def get_chatwoot_conversation(conversation_id: int) -> Optional[dict]: | |
| """Fetch conversation details and messages from Chatwoot""" | |
| print(f"\nπ Starting to fetch conversation {conversation_id}") | |
| print(f"Using base URL: {CHATWOOT_BASE_URL}") | |
| print(f"Account ID: {CHATWOOT_ACCOUNT_ID}") | |
| try: | |
| headers = { | |
| "api_access_token": CHATWOOT_API_KEY[:5] + "..." if CHATWOOT_API_KEY else "None", # Log first 5 chars of token for security | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| print(f"Request headers: {headers}") | |
| async with httpx.AsyncClient(timeout=30.0) as http: | |
| # Get conversation messages directly since that's what we need | |
| msgs_url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}/messages" | |
| print(f"\nπ Making request to: {msgs_url}") | |
| print(f"Method: GET") | |
| print(f"Full URL: {msgs_url}") | |
| print(f"Headers: {headers}") | |
| try: | |
| msgs_resp = await http.get(msgs_url, headers=headers) | |
| print(f"\nβ Response received") | |
| print(f"Status code: {msgs_resp.status_code}") | |
| print(f"Response headers: {dict(msgs_resp.headers)}") | |
| # Log response body (truncated if too long) | |
| response_text = msgs_resp.text | |
| print(f"Response length: {len(response_text)} characters") | |
| print(f"First 500 chars of response: {response_text[:500]}...") | |
| msgs_resp.raise_for_status() | |
| # Parse the JSON response | |
| response_data = msgs_resp.json() | |
| print(f"π Response data type: {type(response_data).__name__}") | |
| # Handle different response formats | |
| if isinstance(response_data, dict): | |
| # If the response has a 'payload' key, use that | |
| messages = response_data.get('payload', []) | |
| meta = response_data.get('meta', {}) | |
| elif isinstance(response_data, list): | |
| # If the response is directly a list of messages | |
| messages = response_data | |
| meta = {} | |
| else: | |
| print(f"β οΈ Unexpected response format: {type(response_data)}") | |
| return None | |
| if not isinstance(messages, list): | |
| print(f"β οΈ Messages is not a list: {type(messages)}") | |
| return None | |
| print(f"\nπ© Successfully parsed {len(messages)} messages") | |
| if messages: | |
| first_msg = messages[0] | |
| print(f"First message type: {type(first_msg).__name__}") | |
| print(f"First message keys: {list(first_msg.keys()) if hasattr(first_msg, 'keys') else 'N/A'}") | |
| # Process and filter messages | |
| filtered_messages = [] | |
| for msg in messages: | |
| try: | |
| # Skip if no content or sender info | |
| if not msg.get('content') or not isinstance(msg.get('content'), str): | |
| continue | |
| # Get sender info, handle different formats | |
| sender = msg.get('sender', {}) | |
| sender_type = None | |
| sender_name = 'Unknown' | |
| if isinstance(sender, dict): | |
| sender_type = sender.get('type') | |
| sender_name = sender.get('name') or sender.get('available_name', 'Unknown') | |
| # Only include relevant message types | |
| if sender_type not in ['contact', 'user', 'agent']: | |
| continue | |
| # Create the filtered message | |
| filtered_msg = { | |
| 'id': msg.get('id'), | |
| 'content': msg['content'], | |
| 'created_at': msg.get('created_at'), | |
| 'sender': { | |
| 'type': sender_type, | |
| 'name': sender_name | |
| }, | |
| 'message_type': 'incoming' if msg.get('message_type') == 0 else 'outgoing', | |
| 'timestamp': msg.get('created_at') | |
| } | |
| filtered_messages.append(filtered_msg) | |
| except Exception as e: | |
| print(f"β οΈ Error processing message {msg.get('id')}: {str(e)}") | |
| continue | |
| print(f"β Filtered to {len(filtered_messages)} relevant messages") | |
| # Sort messages by timestamp if available | |
| filtered_messages.sort(key=lambda x: x.get('timestamp', 0)) | |
| return { | |
| 'meta': meta, | |
| 'payload': filtered_messages | |
| } | |
| except httpx.HTTPStatusError as e: | |
| print(f"\nβ HTTP Error: {e}") | |
| print(f"Response: {e.response.text}") | |
| return None | |
| except Exception as e: | |
| import traceback | |
| print(f"\nβ Unexpected error: {str(e)}") | |
| print("Stack trace:") | |
| print(traceback.format_exc()) | |
| return None | |
| def format_slack_message(conversation_data): | |
| """Format conversation data into Slack blocks""" | |
| if not conversation_data: | |
| return {"response_type": "ephemeral", "text": "β Error: No conversation data received"} | |
| try: | |
| # Get messages from the payload | |
| messages = conversation_data.get("payload", []) | |
| if not messages: | |
| return {"response_type": "ephemeral", "text": "βΉοΈ No messages found in this conversation"} | |
| # Create blocks for the message | |
| blocks = [ | |
| { | |
| "type": "header", | |
| "text": { | |
| "type": "plain_text", | |
| "text": f"π¬ Conversation #{messages[0].get('conversation_id', 'N/A')}", | |
| "emoji": True | |
| } | |
| }, | |
| { | |
| "type": "divider" | |
| } | |
| ] | |
| # Add messages | |
| for msg in sorted(messages, key=lambda x: x.get('created_at', 0)): | |
| content = msg.get("content", "") | |
| if not content or not isinstance(content, str): | |
| continue | |
| # Determine sender info | |
| sender_data = msg.get("sender", {}) | |
| if msg.get("message_type") == 0: # incoming message | |
| sender_name = sender_data.get("name", "User") | |
| sender_emoji = "π€" | |
| else: # outgoing message | |
| sender_name = sender_data.get("available_name", sender_data.get("name", "Bot")) | |
| sender_emoji = "π€" | |
| # Format timestamp if available | |
| timestamp = msg.get("created_at") | |
| if timestamp and isinstance(timestamp, (int, float)): | |
| from datetime import datetime | |
| try: | |
| dt = datetime.fromtimestamp(timestamp) | |
| time_str = dt.strftime("%b %d, %H:%M") | |
| time_display = f"`{time_str}`" | |
| except: | |
| time_display = "" | |
| else: | |
| time_display = "" | |
| # Add message block | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"{sender_emoji} *{sender_name}* {time_display}\n{content}" | |
| } | |
| }) | |
| blocks.append({"type": "divider"}) | |
| return { | |
| "response_type": "in_channel", | |
| "blocks": blocks | |
| } | |
| except Exception as e: | |
| import traceback | |
| print(f"Error formatting Slack message: {e}") | |
| print(traceback.format_exc()) | |
| return { | |
| "response_type": "ephemeral", | |
| "text": f"β Error formatting conversation: {str(e)}" | |
| } | |
| return { | |
| "response_type": "in_channel", | |
| "blocks": blocks | |
| } | |
| async def slack_command( | |
| token: str = Form(...), | |
| command: str = Form(...), | |
| text: str = Form(...), | |
| response_url: str = Form(...), | |
| user_id: str = Form(...) | |
| ): | |
| """Handle Slack slash command to fetch Chatwoot conversation""" | |
| # Verify the Slack token and user | |
| if token != SLACK_VERIFICATION_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| # Check if the user is authorized | |
| if user_id != ALLOWED_SLACK_USER_ID: | |
| return { | |
| "response_type": "ephemeral", | |
| "text": "β This command is restricted to authorized users only." | |
| } | |
| # Extract conversation ID from command text | |
| conversation_id = text.strip() | |
| if not conversation_id.isdigit(): | |
| return { | |
| "response_type": "ephemeral", | |
| "text": "β Please provide a valid conversation ID. Usage: `/conversation <conversation_id>`" | |
| } | |
| # Send immediate response (Slack requires response within 3 seconds) | |
| response = { | |
| "response_type": "ephemeral", | |
| "text": f"Fetching conversation {conversation_id}..." | |
| } | |
| # Fetch conversation data in the background | |
| import asyncio | |
| asyncio.create_task(_process_slack_command(conversation_id, response_url)) | |
| return response | |
| async def _process_slack_command(conversation_id: str, response_url: str): | |
| """Process the Slack command asynchronously""" | |
| try: | |
| # Fetch conversation data | |
| conversation_data = await get_chatwoot_conversation(conversation_id) | |
| if not conversation_data: | |
| raise Exception("Failed to fetch conversation data") | |
| # Format the response | |
| formatted_response = format_slack_message(conversation_data) | |
| # Send the formatted response to Slack | |
| async with httpx.AsyncClient() as http: | |
| await http.post( | |
| response_url, | |
| json=formatted_response, | |
| headers={"Content-Type": "application/json"} | |
| ) | |
| except Exception as e: | |
| error_msg = { | |
| "response_type": "ephemeral", | |
| "text": f"β Error fetching conversation: {str(e)}" | |
| } | |
| async with httpx.AsyncClient() as http: | |
| await http.post( | |
| response_url, | |
| json=error_msg, | |
| headers={"Content-Type": "application/json"} | |
| ) | |
| async def send_chatwoot_message(conversation_id: str, content: str): | |
| """Send a message to a Chatwoot conversation""" | |
| message_payload = { | |
| "content": content, | |
| "message_type": "outgoing", | |
| "private": False, | |
| "content_type": "text", | |
| "content_attributes": {} | |
| } | |
| try: | |
| url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}/messages" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "api_access_token": CHATWOOT_API_KEY, | |
| } | |
| print(f"π€ Sending to Chatwoot: {url}") | |
| print(f"π Using API key: {CHATWOOT_API_KEY[:5]}..." if CHATWOOT_API_KEY else "β No API key provided!") | |
| print("π¦ Payload:", json.dumps(message_payload, indent=2)) | |
| print("π Headers:", json.dumps({k: '***' if 'token' in k.lower() or 'key' in k.lower() else v for k, v in headers.items()}, indent=2)) | |
| async with httpx.AsyncClient() as http: | |
| # First, try to get the conversation to verify access | |
| conv_url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}" | |
| print(f"π Checking conversation access: {conv_url}") | |
| # Test GET request to verify conversation access | |
| test_resp = await http.get(conv_url, headers=headers) | |
| print(f"π Conversation check status: {test_resp.status_code}") | |
| print(f"π Response: {test_resp.text[:500]}..." if test_resp.text else "No response body") | |
| if test_resp.status_code != 200: | |
| error_msg = f"Failed to access conversation: {test_resp.status_code} - {test_resp.text}" | |
| print(f"β {error_msg}") | |
| return {"status": "error", "message": error_msg} | |
| # If we can access the conversation, try to send the message | |
| print("β Conversation access verified. Sending message...") | |
| resp = await http.post( | |
| url, | |
| headers=headers, | |
| json=message_payload | |
| ) | |
| print(f"π¬ Chatwoot Response Status: {resp.status_code}") | |
| print(f"π¬ Response Body: {resp.text}") | |
| if resp.status_code != 200: | |
| error_msg = f"Failed to send message: {resp.status_code} - {resp.text}" | |
| print(f"β {error_msg}") | |
| return {"status": "error", "message": error_msg} | |
| return {"status": "success", "data": resp.json()} | |
| except httpx.HTTPStatusError as e: | |
| error_msg = f"HTTP error occurred: {str(e)}" | |
| print(f"β {error_msg}") | |
| print(f"Response: {e.response.text if hasattr(e, 'response') else 'No response'}") | |
| return {"status": "error", "message": error_msg} | |
| except Exception as e: | |
| print("β Chatwoot Send Error:", e) |