Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import time | |
| import os | |
| import json | |
| import random | |
| import logging | |
| import groq | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import uvicorn | |
| from supabase import create_client, Client | |
| from urllib.parse import urljoin, urlparse | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Web RAG System API", | |
| description="Extract content from web pages and perform RAG operations", | |
| version="1.0.0" | |
| ) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize Supabase client with environment variables | |
| try: | |
| url = os.environ.get('SUPABASE_URL') | |
| key = os.environ.get('SUPABASE_SERVICE_ROLE_KEY') | |
| if not url or not key: | |
| logger.warning("Supabase credentials not found in environment variables") | |
| supabase = None | |
| else: | |
| supabase: Client = create_client(url, key) | |
| logger.info("Supabase client initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Supabase client: {e}") | |
| supabase = None | |
| # User agents for web scraping | |
| user_agents = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/102.0", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/103.0.1264.49", | |
| "Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Mobile/15E148 Safari/604.1", | |
| "Mozilla/5.0 (iPad; CPU OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Mobile/15E148 Safari/604.1", | |
| "Mozilla/5.0 (Linux; Android 12; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36", | |
| "Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36", | |
| "Mozilla/5.0 (Linux; Android 11; SM-A217F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36", | |
| "Mozilla/5.0 (Linux; Android 10; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36" | |
| ] | |
| # Pydantic models | |
| class RAGRequest(BaseModel): | |
| file_path: str | |
| prompt: str | |
| class URL(BaseModel): | |
| url: str | |
| async def root(): | |
| """Health check endpoint""" | |
| return {"message": "Web RAG System API is running", "status": "healthy"} | |
| async def health_check(): | |
| """Detailed health check""" | |
| health_status = { | |
| "api": "healthy", | |
| "supabase": "connected" if supabase else "not configured", | |
| "hf_token": "configured" if os.environ.get('hf_token') else "not configured", | |
| "groq_token": "configured" if os.environ.get('groq_token') else "not configured" | |
| } | |
| return health_status | |
| async def rag(request: RAGRequest): | |
| """Perform RAG operations on extracted text""" | |
| try: | |
| # Check required environment variables | |
| hf_token = os.environ.get('hf_token') | |
| groq_token = os.environ.get('groq_token') | |
| if not hf_token: | |
| raise HTTPException(status_code=500, detail="HuggingFace token not configured") | |
| if not groq_token: | |
| raise HTTPException(status_code=500, detail="Groq token not configured") | |
| if not supabase: | |
| raise HTTPException(status_code=500, detail="Supabase not configured") | |
| logger.info(f"Processing RAG request for file: {request.file_path}") | |
| # HuggingFace Inference API for embeddings | |
| API_URL = "https://router.huggingface.co/hf-inference/models/BAAI/bge-large-en-v1.5/pipeline/feature-extraction" | |
| headers = { | |
| "Authorization": f"Bearer {hf_token}", | |
| } | |
| def query(payload): | |
| response = requests.post(API_URL, headers=headers, json=payload) | |
| if response.status_code != 200: | |
| logger.error(f"HuggingFace API error: {response.status_code} - {response.text}") | |
| raise HTTPException(status_code=500, detail="Failed to get embeddings from HuggingFace") | |
| return response.json() | |
| # Create a Groq client | |
| groq_client = groq.Client(api_key=groq_token) | |
| def process_with_groq(query_text, context): | |
| prompt = f""" | |
| Context information: | |
| {context} | |
| Based on the context information above, please answer the following question: | |
| {query_text} | |
| Answer: | |
| """ | |
| try: | |
| response = groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama-3.3-70b-versatile", | |
| temperature=0.4, | |
| max_tokens=512 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| logger.error(f"Groq API error: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to process with Groq") | |
| def get_file_from_supabase(bucket_name, file_path): | |
| try: | |
| response = supabase.storage.from_(bucket_name).download(file_path) | |
| content = response.decode('utf-8') | |
| return content | |
| except Exception as e: | |
| logger.error(f"Error downloading file from Supabase: {e}") | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"File not found in Supabase bucket: {file_path}" | |
| ) | |
| # Get file content from Supabase | |
| bucket_name = "url-2-ans-bucket" | |
| file_path = request.file_path | |
| content = get_file_from_supabase(bucket_name, file_path) | |
| logger.info(f"Successfully downloaded file from Supabase: {file_path}") | |
| # Simple text chunking | |
| chunk_size = 1000 | |
| overlap = 200 | |
| chunks = [] | |
| for i in range(0, len(content), chunk_size - overlap): | |
| chunk = content[i:i + chunk_size] | |
| if len(chunk) > 100: | |
| chunks.append({"text": chunk, "position": i}) | |
| logger.info(f"Created {len(chunks)} chunks from document") | |
| # Get embeddings for all chunks | |
| chunk_embeddings = [] | |
| for chunk in chunks: | |
| embedding = query({"inputs": chunk["text"]}) | |
| chunk_embeddings.append(embedding) | |
| # Get embedding for the query | |
| query_embedding = query({"inputs": request.prompt}) | |
| # Calculate similarity between query and all chunks | |
| similarities = [] | |
| for chunk_embedding in chunk_embeddings: | |
| query_np = np.array(query_embedding) | |
| chunk_np = np.array(chunk_embedding) | |
| if len(query_np.shape) == 1: | |
| query_np = query_np.reshape(1, -1) | |
| if len(chunk_np.shape) == 1: | |
| chunk_np = chunk_np.reshape(1, -1) | |
| similarity = cosine_similarity(query_np, chunk_np)[0][0] | |
| similarities.append(similarity) | |
| # Get top 3 most similar chunks | |
| top_k = 3 | |
| top_indices = np.argsort(similarities)[-top_k:][::-1] | |
| relevant_chunks = [chunks[i]["text"] for i in top_indices] | |
| context_text = "\n\n".join(relevant_chunks) | |
| # Process with Groq | |
| answer = process_with_groq(request.prompt, context_text) | |
| # Prepare sources | |
| sources = [{"text": chunks[i]["text"][:200] + "...", "position": chunks[i]["position"]} | |
| for i in top_indices] | |
| return { | |
| "sources": sources, | |
| "user_query": request.prompt, | |
| "assistant_response": answer, | |
| "file_source": f"supabase://{bucket_name}/{file_path}" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception("Error occurred in RAG process") | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| async def extract_links(url: URL): | |
| """Extract unique links from a given URL""" | |
| def extract_unique_links(url_string, max_retries=3, timeout=30): | |
| for attempt in range(max_retries): | |
| try: | |
| headers = {'User-Agent': random.choice(user_agents)} | |
| response = requests.get(url_string, headers=headers, timeout=timeout) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| base_url = urlparse(url_string) | |
| base_url = f"{base_url.scheme}://{base_url.netloc}" | |
| a_tags = soup.find_all('a', href=True) | |
| links = [] | |
| for a in a_tags: | |
| href = a.get('href') | |
| full_url = urljoin(base_url, href) | |
| links.append(full_url) | |
| unique_links = list(dict.fromkeys(links)) | |
| unique_links.insert(0, url_string) | |
| return unique_links | |
| except requests.RequestException as e: | |
| logger.warning(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt < max_retries - 1: | |
| wait_time = 5 * (attempt + 1) | |
| time.sleep(wait_time) | |
| else: | |
| logger.error(f"Failed to retrieve {url_string} after {max_retries} attempts.") | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve {url_string} after {max_retries} attempts.") | |
| return [] | |
| try: | |
| unique_links = extract_unique_links(url.url) | |
| return {"unique_links": unique_links} | |
| except Exception as e: | |
| logger.exception("Error in extract_links") | |
| raise HTTPException(status_code=500, detail=f"Failed to extract links: {str(e)}") | |
| async def extract_text(urls: List[str]): | |
| """Extract text content from multiple URLs""" | |
| if not supabase: | |
| raise HTTPException(status_code=500, detail="Supabase not configured") | |
| output_file = "extracted_text.txt" | |
| def upload_text_content(filename, content, bucket_name): | |
| try: | |
| file_content = content.encode('utf-8') | |
| # Try to upload first | |
| try: | |
| response = supabase.storage.from_(bucket_name).upload( | |
| path=filename, | |
| file=file_content, | |
| file_options={"content-type": "text/plain"} | |
| ) | |
| logger.info(f"Text file uploaded successfully: {filename}") | |
| return response | |
| except Exception as upload_error: | |
| # If upload fails (file exists), try to update | |
| try: | |
| response = supabase.storage.from_(bucket_name).update( | |
| path=filename, | |
| file=file_content, | |
| file_options={"content-type": "text/plain"} | |
| ) | |
| logger.info(f"Text file updated successfully: {filename}") | |
| return response | |
| except Exception as update_error: | |
| logger.error(f"Error updating text content: {update_error}") | |
| raise HTTPException(status_code=500, detail="Failed to save file to storage") | |
| except Exception as e: | |
| logger.error(f"Error with file operations: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to save file to storage") | |
| def text_data_extractor(links): | |
| extracted_texts = [] | |
| for link in links: | |
| parsed_url = urlparse(link) | |
| if not parsed_url.scheme: | |
| logger.warning(f"Invalid URL: {link}") | |
| continue | |
| retries = 3 | |
| while retries > 0: | |
| try: | |
| headers = {'User-Agent': random.choice(user_agents)} | |
| response = requests.get(link, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| text = soup.get_text() | |
| clean_text = ' '.join(text.split()) | |
| extracted_texts.append({"url": link, "text": clean_text}) | |
| break | |
| except requests.RequestException as e: | |
| retries -= 1 | |
| logger.warning(f"Retry {3 - retries} for {link} failed: {e}") | |
| if retries > 0: | |
| wait_time = 5 * (3 - retries) | |
| time.sleep(wait_time) | |
| if retries == 0: | |
| extracted_texts.append({ | |
| "url": link, | |
| "text": "Failed to retrieve text after multiple attempts." | |
| }) | |
| return extracted_texts | |
| try: | |
| extracted_data = text_data_extractor(urls) | |
| string_output = json.dumps(extracted_data, ensure_ascii=False, indent=2) | |
| # Upload to Supabase | |
| upload_text_content(output_file, string_output, "url-2-ans-bucket") | |
| return {"extracted_data": extracted_data, "file_saved": output_file} | |
| except Exception as e: | |
| logger.exception("Error in extract_text") | |
| raise HTTPException(status_code=500, detail=f"Failed to extract text: {str(e)}") | |
| # Main execution | |
| if __name__ == "__main__": | |
| # Run the FastAPI app | |
| uvicorn.run( | |
| "main_api:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=False, # Disable reload for production | |
| access_log=True | |
| ) |