Spaces:
Running
Running
| """Supabase PGVector connection for Johnny Harris transcript embeddings""" | |
| import os | |
| import random | |
| from typing import List, Dict, Any, Optional | |
| from supabase import create_client, Client | |
| import requests | |
| class TranscriptChunk: | |
| """Represents a transcript chunk from the database""" | |
| def __init__(self, chunk_text: str, metadata: dict): | |
| self.chunk_text = chunk_text | |
| self.metadata = metadata | |
| def video_id(self) -> str: | |
| return self.metadata.get('video_id', '') | |
| def video_url(self) -> str: | |
| return self.metadata.get('video_url', '') | |
| def title(self) -> str: | |
| return self.metadata.get('title', '') | |
| def chunk_index(self) -> int: | |
| return self.metadata.get('chunk_index', 0) | |
| def total_chunks(self) -> int: | |
| return self.metadata.get('total_chunks', 0) | |
| def similarity(self) -> float: | |
| return self.metadata.get('similarity', 0.0) | |
| class TranscriptVectorStore: | |
| """Manages connection to Supabase PGVector database with Johnny Harris transcript embeddings""" | |
| def __init__( | |
| self, | |
| supabase_url: Optional[str] = None, | |
| supabase_key: Optional[str] = None, | |
| jina_api_key: Optional[str] = None, | |
| embedding_model: str = "jina-embeddings-v3" | |
| ): | |
| """ | |
| Initialize the vector store connection | |
| Args: | |
| supabase_url: Supabase project URL (defaults to SUPABASE_URL env var) | |
| supabase_key: Supabase anon key (defaults to SUPABASE_KEY env var) | |
| jina_api_key: Jina AI API key (defaults to JINA_API_KEY env var) | |
| embedding_model: Embedding model to use (default: jina-embeddings-v3) | |
| """ | |
| self.supabase_url = supabase_url or os.getenv("SUPABASE_URL") | |
| self.supabase_key = supabase_key or os.getenv("SUPABASE_KEY") | |
| self.jina_api_key = jina_api_key or os.getenv("JINA_API_KEY") | |
| self.embedding_model = embedding_model | |
| if not self.supabase_url or not self.supabase_key: | |
| raise ValueError("SUPABASE_URL and SUPABASE_KEY environment variables must be set") | |
| if not self.jina_api_key: | |
| raise ValueError("JINA_API_KEY environment variable must be set") | |
| # Initialize Supabase client | |
| self.supabase: Client = create_client(self.supabase_url, self.supabase_key) | |
| def _generate_embedding(self, text: str, task: str = "retrieval.query") -> List[float]: | |
| """ | |
| Generate embedding for text using Jina AI API | |
| Args: | |
| text: Text to embed | |
| task: Task type - 'retrieval.query' for queries, 'retrieval.passage' for documents | |
| Returns: | |
| List of floats representing the embedding vector (1024 dimensions) | |
| """ | |
| try: | |
| api_url = "https://api.jina.ai/v1/embeddings" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {self.jina_api_key}" | |
| } | |
| payload = { | |
| "model": self.embedding_model, | |
| "task": task, | |
| "input": [text] | |
| } | |
| response = requests.post(api_url, headers=headers, json=payload, timeout=30) | |
| if response.status_code != 200: | |
| raise Exception(f"Jina API returned status {response.status_code}: {response.text}") | |
| result = response.json() | |
| if isinstance(result, dict) and 'data' in result: | |
| return result['data'][0]['embedding'] | |
| raise Exception("Unexpected response format from Jina API") | |
| except Exception as e: | |
| raise Exception(f"Error generating embedding: {str(e)}") | |
| def similarity_search( | |
| self, | |
| query: str, | |
| k: int = 10, | |
| match_threshold: float = 0.7 | |
| ) -> List[TranscriptChunk]: | |
| """ | |
| Perform similarity search on the transcript database (Tab 1: Topic Search) | |
| Args: | |
| query: Search query | |
| k: Number of results to return | |
| match_threshold: Minimum similarity threshold (0.0 to 1.0) | |
| Returns: | |
| List of TranscriptChunk objects with relevant transcript chunks | |
| """ | |
| query_embedding = self._generate_embedding(query, task="retrieval.query") | |
| try: | |
| response = self.supabase.rpc( | |
| 'match_transcripts', | |
| { | |
| 'query_embedding': query_embedding, | |
| 'match_threshold': match_threshold, | |
| 'match_count': k | |
| } | |
| ).execute() | |
| chunks = [] | |
| for item in response.data: | |
| chunk = TranscriptChunk( | |
| chunk_text=item.get('chunk_text') or '', | |
| metadata={ | |
| 'video_id': item.get('video_id'), | |
| 'video_url': item.get('video_url'), | |
| 'title': item.get('title', ''), | |
| 'chunk_index': item.get('chunk_index'), | |
| 'total_chunks': item.get('total_chunks'), | |
| 'similarity': item.get('similarity', 0.0) | |
| } | |
| ) | |
| chunks.append(chunk) | |
| return chunks | |
| except Exception as e: | |
| raise Exception(f"Error performing similarity search: {str(e)}") | |
| def tiered_similarity_search( | |
| self, | |
| query: str, | |
| direct_threshold: float = 0.6, | |
| related_threshold: float = 0.3, | |
| max_per_tier: int = 10 | |
| ) -> tuple: | |
| """ | |
| Search with tiered results: direct matches and related content. | |
| Args: | |
| query: Search query | |
| direct_threshold: Minimum similarity for direct matches (default 0.6) | |
| related_threshold: Minimum similarity for related content (default 0.3) | |
| max_per_tier: Maximum results per tier | |
| Returns: | |
| Tuple of (direct_matches, related_content) - two separate lists | |
| """ | |
| query_embedding = self._generate_embedding(query, task="retrieval.query") | |
| try: | |
| # Get all results above the related threshold | |
| response = self.supabase.rpc( | |
| 'match_transcripts', | |
| { | |
| 'query_embedding': query_embedding, | |
| 'match_threshold': related_threshold, | |
| 'match_count': max_per_tier * 3 # Get more to filter | |
| } | |
| ).execute() | |
| direct_matches = [] | |
| related_content = [] | |
| seen_videos = set() | |
| for item in response.data: | |
| similarity = item.get('similarity', 0.0) | |
| video_id = item.get('video_id') | |
| # Deduplicate by video (keep highest similarity per video) | |
| if video_id in seen_videos: | |
| continue | |
| seen_videos.add(video_id) | |
| chunk = TranscriptChunk( | |
| chunk_text=item.get('chunk_text') or '', | |
| metadata={ | |
| 'video_id': video_id, | |
| 'video_url': item.get('video_url'), | |
| 'title': item.get('title', ''), | |
| 'chunk_index': item.get('chunk_index'), | |
| 'total_chunks': item.get('total_chunks'), | |
| 'similarity': similarity | |
| } | |
| ) | |
| if similarity >= direct_threshold: | |
| if len(direct_matches) < max_per_tier: | |
| direct_matches.append(chunk) | |
| elif similarity >= related_threshold: | |
| if len(related_content) < max_per_tier: | |
| related_content.append(chunk) | |
| return (direct_matches, related_content) | |
| except Exception as e: | |
| raise Exception(f"Error performing tiered search: {str(e)}") | |
| def get_video_chunks(self, video_id: str) -> List[TranscriptChunk]: | |
| """ | |
| Fetch all chunks for a specific video | |
| Args: | |
| video_id: YouTube video ID | |
| Returns: | |
| List of TranscriptChunk objects ordered by chunk_index | |
| """ | |
| try: | |
| response = self.supabase.from_('johnny_transcripts') \ | |
| .select('video_id, video_url, title, chunk_text, chunk_index, total_chunks') \ | |
| .eq('video_id', video_id) \ | |
| .order('chunk_index') \ | |
| .execute() | |
| chunks = [] | |
| for item in response.data: | |
| chunk = TranscriptChunk( | |
| chunk_text=item.get('chunk_text') or '', | |
| metadata={ | |
| 'video_id': item.get('video_id'), | |
| 'video_url': item.get('video_url'), | |
| 'title': item.get('title', ''), | |
| 'chunk_index': item.get('chunk_index'), | |
| 'total_chunks': item.get('total_chunks'), | |
| 'similarity': 1.0 | |
| } | |
| ) | |
| chunks.append(chunk) | |
| return chunks | |
| except Exception as e: | |
| raise Exception(f"Error fetching video chunks: {str(e)}") | |
| def get_random_diverse_chunks(self, n: int = 50) -> List[TranscriptChunk]: | |
| """ | |
| Fetch random chunks from different videos for style variety | |
| Args: | |
| n: Number of random chunks to fetch | |
| Returns: | |
| List of TranscriptChunk objects from diverse videos | |
| """ | |
| try: | |
| # Get all unique video IDs first | |
| response = self.supabase.from_('johnny_transcripts') \ | |
| .select('video_id') \ | |
| .execute() | |
| video_ids = list(set(item['video_id'] for item in response.data if item.get('video_id'))) | |
| if not video_ids: | |
| return [] | |
| # Sample from different videos to ensure diversity | |
| chunks = [] | |
| chunks_per_video = max(1, n // len(video_ids)) if video_ids else n | |
| # Shuffle video IDs for randomness | |
| random.shuffle(video_ids) | |
| for video_id in video_ids[:min(len(video_ids), n)]: | |
| try: | |
| # Get random chunks from this video | |
| video_response = self.supabase.from_('johnny_transcripts') \ | |
| .select('video_id, video_url, title, chunk_text, chunk_index, total_chunks') \ | |
| .eq('video_id', video_id) \ | |
| .limit(chunks_per_video) \ | |
| .execute() | |
| for item in video_response.data: | |
| chunk = TranscriptChunk( | |
| chunk_text=item.get('chunk_text') or '', | |
| metadata={ | |
| 'video_id': item.get('video_id'), | |
| 'video_url': item.get('video_url'), | |
| 'title': item.get('title', ''), | |
| 'chunk_index': item.get('chunk_index'), | |
| 'total_chunks': item.get('total_chunks'), | |
| 'similarity': 0.0 # Random selection, no similarity score | |
| } | |
| ) | |
| chunks.append(chunk) | |
| if len(chunks) >= n: | |
| break | |
| except Exception: | |
| continue | |
| return chunks[:n] | |
| except Exception as e: | |
| raise Exception(f"Error fetching random chunks: {str(e)}") | |
| def get_bulk_style_context( | |
| self, | |
| topic_query: str, | |
| max_chunks: int = 100, | |
| topic_relevant_ratio: float = 0.3 | |
| ) -> List[TranscriptChunk]: | |
| """ | |
| Retrieve maximum context from knowledge base for script generation (Tab 2) | |
| This method combines: | |
| 1. Topic-relevant chunks (found via similarity search) | |
| 2. Diverse random samples from across the archive | |
| The entire knowledge base serves as the style reference. | |
| Args: | |
| topic_query: User's topic/bullet points to find relevant content | |
| max_chunks: Maximum number of chunks to retrieve | |
| topic_relevant_ratio: Ratio of chunks that should be topic-relevant (0.0 to 1.0) | |
| Returns: | |
| List of TranscriptChunk objects (topic-relevant + diverse samples) | |
| """ | |
| topic_relevant_count = int(max_chunks * topic_relevant_ratio) | |
| diverse_count = max_chunks - topic_relevant_count | |
| # Get topic-relevant chunks | |
| topic_chunks = self.similarity_search( | |
| query=topic_query, | |
| k=topic_relevant_count, | |
| match_threshold=0.3 # Lower threshold to get more results | |
| ) | |
| # Get diverse random chunks for style variety | |
| diverse_chunks = self.get_random_diverse_chunks(n=diverse_count) | |
| # Combine and deduplicate by video_id + chunk_index | |
| seen = set() | |
| combined = [] | |
| for chunk in topic_chunks + diverse_chunks: | |
| key = (chunk.video_id, chunk.chunk_index) | |
| if key not in seen: | |
| seen.add(key) | |
| combined.append(chunk) | |
| return combined[:max_chunks] | |
| def get_all_chunks(self, limit: int = 500) -> List[TranscriptChunk]: | |
| """ | |
| Fetch all chunks from the database (up to limit) | |
| Args: | |
| limit: Maximum number of chunks to fetch | |
| Returns: | |
| List of TranscriptChunk objects | |
| """ | |
| try: | |
| response = self.supabase.from_('johnny_transcripts') \ | |
| .select('video_id, video_url, title, chunk_text, chunk_index, total_chunks') \ | |
| .limit(limit) \ | |
| .execute() | |
| chunks = [] | |
| for item in response.data: | |
| chunk = TranscriptChunk( | |
| chunk_text=item.get('chunk_text') or '', | |
| metadata={ | |
| 'video_id': item.get('video_id'), | |
| 'video_url': item.get('video_url'), | |
| 'title': item.get('title', ''), | |
| 'chunk_index': item.get('chunk_index'), | |
| 'total_chunks': item.get('total_chunks'), | |
| 'similarity': 0.0 | |
| } | |
| ) | |
| chunks.append(chunk) | |
| return chunks | |
| except Exception as e: | |
| raise Exception(f"Error fetching all chunks: {str(e)}") | |
| def format_results_for_display(self, chunks: List[TranscriptChunk]) -> str: | |
| """ | |
| Format search results for Tab 1 display | |
| Args: | |
| chunks: List of TranscriptChunk objects | |
| Returns: | |
| Formatted markdown string for display | |
| """ | |
| if not chunks: | |
| return "No matching content found." | |
| # Group by video | |
| videos = {} | |
| for chunk in chunks: | |
| video_id = chunk.video_id | |
| if video_id not in videos: | |
| videos[video_id] = { | |
| 'title': chunk.title, | |
| 'url': chunk.video_url, | |
| 'chunks': [], | |
| 'max_similarity': 0.0 | |
| } | |
| videos[video_id]['chunks'].append(chunk) | |
| videos[video_id]['max_similarity'] = max( | |
| videos[video_id]['max_similarity'], | |
| chunk.similarity | |
| ) | |
| # Sort by max similarity | |
| sorted_videos = sorted( | |
| videos.items(), | |
| key=lambda x: x[1]['max_similarity'], | |
| reverse=True | |
| ) | |
| # Format output | |
| output = [] | |
| for video_id, data in sorted_videos: | |
| similarity_pct = int(data['max_similarity'] * 100) | |
| output.append(f"### [{data['title']}]({data['url']})") | |
| output.append(f"**Relevance:** {similarity_pct}%\n") | |
| # Show top excerpt | |
| top_chunk = max(data['chunks'], key=lambda c: c.similarity) | |
| excerpt = top_chunk.chunk_text[:500] + "..." if len(top_chunk.chunk_text) > 500 else top_chunk.chunk_text | |
| output.append(f"> {excerpt}\n") | |
| return "\n".join(output) | |
| def format_context_for_llm(self, chunks: List[TranscriptChunk]) -> str: | |
| """ | |
| Format chunks as context for LLM script generation (Tab 2) | |
| Args: | |
| chunks: List of TranscriptChunk objects | |
| Returns: | |
| Formatted string with transcript excerpts for LLM context | |
| """ | |
| if not chunks: | |
| return "" | |
| formatted = [] | |
| for i, chunk in enumerate(chunks, 1): | |
| formatted.append(f"[Excerpt {i} - {chunk.title}]\n{chunk.chunk_text}") | |
| return "\n\n---\n\n".join(formatted) | |
| def create_vectorstore() -> TranscriptVectorStore: | |
| """Factory function to create and return a configured vector store""" | |
| return TranscriptVectorStore() | |