"""Supabase PGVector connection for Johnny Harris transcript embeddings""" import os import random from typing import List, Dict, Any, Optional from supabase import create_client, Client import requests class TranscriptChunk: """Represents a transcript chunk from the database""" def __init__(self, chunk_text: str, metadata: dict): self.chunk_text = chunk_text self.metadata = metadata @property def video_id(self) -> str: return self.metadata.get('video_id', '') @property def video_url(self) -> str: return self.metadata.get('video_url', '') @property def title(self) -> str: return self.metadata.get('title', '') @property def chunk_index(self) -> int: return self.metadata.get('chunk_index', 0) @property def total_chunks(self) -> int: return self.metadata.get('total_chunks', 0) @property def similarity(self) -> float: return self.metadata.get('similarity', 0.0) class TranscriptVectorStore: """Manages connection to Supabase PGVector database with Johnny Harris transcript embeddings""" def __init__( self, supabase_url: Optional[str] = None, supabase_key: Optional[str] = None, jina_api_key: Optional[str] = None, embedding_model: str = "jina-embeddings-v3" ): """ Initialize the vector store connection Args: supabase_url: Supabase project URL (defaults to SUPABASE_URL env var) supabase_key: Supabase anon key (defaults to SUPABASE_KEY env var) jina_api_key: Jina AI API key (defaults to JINA_API_KEY env var) embedding_model: Embedding model to use (default: jina-embeddings-v3) """ self.supabase_url = supabase_url or os.getenv("SUPABASE_URL") self.supabase_key = supabase_key or os.getenv("SUPABASE_KEY") self.jina_api_key = jina_api_key or os.getenv("JINA_API_KEY") self.embedding_model = embedding_model if not self.supabase_url or not self.supabase_key: raise ValueError("SUPABASE_URL and SUPABASE_KEY environment variables must be set") if not self.jina_api_key: raise ValueError("JINA_API_KEY environment variable must be set") # Initialize Supabase client self.supabase: Client = create_client(self.supabase_url, self.supabase_key) def _generate_embedding(self, text: str, task: str = "retrieval.query") -> List[float]: """ Generate embedding for text using Jina AI API Args: text: Text to embed task: Task type - 'retrieval.query' for queries, 'retrieval.passage' for documents Returns: List of floats representing the embedding vector (1024 dimensions) """ try: api_url = "https://api.jina.ai/v1/embeddings" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.jina_api_key}" } payload = { "model": self.embedding_model, "task": task, "input": [text] } response = requests.post(api_url, headers=headers, json=payload, timeout=30) if response.status_code != 200: raise Exception(f"Jina API returned status {response.status_code}: {response.text}") result = response.json() if isinstance(result, dict) and 'data' in result: return result['data'][0]['embedding'] raise Exception("Unexpected response format from Jina API") except Exception as e: raise Exception(f"Error generating embedding: {str(e)}") def similarity_search( self, query: str, k: int = 10, match_threshold: float = 0.7 ) -> List[TranscriptChunk]: """ Perform similarity search on the transcript database (Tab 1: Topic Search) Args: query: Search query k: Number of results to return match_threshold: Minimum similarity threshold (0.0 to 1.0) Returns: List of TranscriptChunk objects with relevant transcript chunks """ query_embedding = self._generate_embedding(query, task="retrieval.query") try: response = self.supabase.rpc( 'match_transcripts', { 'query_embedding': query_embedding, 'match_threshold': match_threshold, 'match_count': k } ).execute() chunks = [] for item in response.data: chunk = TranscriptChunk( chunk_text=item.get('chunk_text') or '', metadata={ 'video_id': item.get('video_id'), 'video_url': item.get('video_url'), 'title': item.get('title', ''), 'chunk_index': item.get('chunk_index'), 'total_chunks': item.get('total_chunks'), 'similarity': item.get('similarity', 0.0) } ) chunks.append(chunk) return chunks except Exception as e: raise Exception(f"Error performing similarity search: {str(e)}") def tiered_similarity_search( self, query: str, direct_threshold: float = 0.6, related_threshold: float = 0.3, max_per_tier: int = 10 ) -> tuple: """ Search with tiered results: direct matches and related content. Args: query: Search query direct_threshold: Minimum similarity for direct matches (default 0.6) related_threshold: Minimum similarity for related content (default 0.3) max_per_tier: Maximum results per tier Returns: Tuple of (direct_matches, related_content) - two separate lists """ query_embedding = self._generate_embedding(query, task="retrieval.query") try: # Get all results above the related threshold response = self.supabase.rpc( 'match_transcripts', { 'query_embedding': query_embedding, 'match_threshold': related_threshold, 'match_count': max_per_tier * 3 # Get more to filter } ).execute() direct_matches = [] related_content = [] seen_videos = set() for item in response.data: similarity = item.get('similarity', 0.0) video_id = item.get('video_id') # Deduplicate by video (keep highest similarity per video) if video_id in seen_videos: continue seen_videos.add(video_id) chunk = TranscriptChunk( chunk_text=item.get('chunk_text') or '', metadata={ 'video_id': video_id, 'video_url': item.get('video_url'), 'title': item.get('title', ''), 'chunk_index': item.get('chunk_index'), 'total_chunks': item.get('total_chunks'), 'similarity': similarity } ) if similarity >= direct_threshold: if len(direct_matches) < max_per_tier: direct_matches.append(chunk) elif similarity >= related_threshold: if len(related_content) < max_per_tier: related_content.append(chunk) return (direct_matches, related_content) except Exception as e: raise Exception(f"Error performing tiered search: {str(e)}") def get_video_chunks(self, video_id: str) -> List[TranscriptChunk]: """ Fetch all chunks for a specific video Args: video_id: YouTube video ID Returns: List of TranscriptChunk objects ordered by chunk_index """ try: response = self.supabase.from_('johnny_transcripts') \ .select('video_id, video_url, title, chunk_text, chunk_index, total_chunks') \ .eq('video_id', video_id) \ .order('chunk_index') \ .execute() chunks = [] for item in response.data: chunk = TranscriptChunk( chunk_text=item.get('chunk_text') or '', metadata={ 'video_id': item.get('video_id'), 'video_url': item.get('video_url'), 'title': item.get('title', ''), 'chunk_index': item.get('chunk_index'), 'total_chunks': item.get('total_chunks'), 'similarity': 1.0 } ) chunks.append(chunk) return chunks except Exception as e: raise Exception(f"Error fetching video chunks: {str(e)}") def get_random_diverse_chunks(self, n: int = 50) -> List[TranscriptChunk]: """ Fetch random chunks from different videos for style variety Args: n: Number of random chunks to fetch Returns: List of TranscriptChunk objects from diverse videos """ try: # Get all unique video IDs first response = self.supabase.from_('johnny_transcripts') \ .select('video_id') \ .execute() video_ids = list(set(item['video_id'] for item in response.data if item.get('video_id'))) if not video_ids: return [] # Sample from different videos to ensure diversity chunks = [] chunks_per_video = max(1, n // len(video_ids)) if video_ids else n # Shuffle video IDs for randomness random.shuffle(video_ids) for video_id in video_ids[:min(len(video_ids), n)]: try: # Get random chunks from this video video_response = self.supabase.from_('johnny_transcripts') \ .select('video_id, video_url, title, chunk_text, chunk_index, total_chunks') \ .eq('video_id', video_id) \ .limit(chunks_per_video) \ .execute() for item in video_response.data: chunk = TranscriptChunk( chunk_text=item.get('chunk_text') or '', metadata={ 'video_id': item.get('video_id'), 'video_url': item.get('video_url'), 'title': item.get('title', ''), 'chunk_index': item.get('chunk_index'), 'total_chunks': item.get('total_chunks'), 'similarity': 0.0 # Random selection, no similarity score } ) chunks.append(chunk) if len(chunks) >= n: break except Exception: continue return chunks[:n] except Exception as e: raise Exception(f"Error fetching random chunks: {str(e)}") def get_bulk_style_context( self, topic_query: str, max_chunks: int = 100, topic_relevant_ratio: float = 0.3 ) -> List[TranscriptChunk]: """ Retrieve maximum context from knowledge base for script generation (Tab 2) This method combines: 1. Topic-relevant chunks (found via similarity search) 2. Diverse random samples from across the archive The entire knowledge base serves as the style reference. Args: topic_query: User's topic/bullet points to find relevant content max_chunks: Maximum number of chunks to retrieve topic_relevant_ratio: Ratio of chunks that should be topic-relevant (0.0 to 1.0) Returns: List of TranscriptChunk objects (topic-relevant + diverse samples) """ topic_relevant_count = int(max_chunks * topic_relevant_ratio) diverse_count = max_chunks - topic_relevant_count # Get topic-relevant chunks topic_chunks = self.similarity_search( query=topic_query, k=topic_relevant_count, match_threshold=0.3 # Lower threshold to get more results ) # Get diverse random chunks for style variety diverse_chunks = self.get_random_diverse_chunks(n=diverse_count) # Combine and deduplicate by video_id + chunk_index seen = set() combined = [] for chunk in topic_chunks + diverse_chunks: key = (chunk.video_id, chunk.chunk_index) if key not in seen: seen.add(key) combined.append(chunk) return combined[:max_chunks] def get_all_chunks(self, limit: int = 500) -> List[TranscriptChunk]: """ Fetch all chunks from the database (up to limit) Args: limit: Maximum number of chunks to fetch Returns: List of TranscriptChunk objects """ try: response = self.supabase.from_('johnny_transcripts') \ .select('video_id, video_url, title, chunk_text, chunk_index, total_chunks') \ .limit(limit) \ .execute() chunks = [] for item in response.data: chunk = TranscriptChunk( chunk_text=item.get('chunk_text') or '', metadata={ 'video_id': item.get('video_id'), 'video_url': item.get('video_url'), 'title': item.get('title', ''), 'chunk_index': item.get('chunk_index'), 'total_chunks': item.get('total_chunks'), 'similarity': 0.0 } ) chunks.append(chunk) return chunks except Exception as e: raise Exception(f"Error fetching all chunks: {str(e)}") def format_results_for_display(self, chunks: List[TranscriptChunk]) -> str: """ Format search results for Tab 1 display Args: chunks: List of TranscriptChunk objects Returns: Formatted markdown string for display """ if not chunks: return "No matching content found." # Group by video videos = {} for chunk in chunks: video_id = chunk.video_id if video_id not in videos: videos[video_id] = { 'title': chunk.title, 'url': chunk.video_url, 'chunks': [], 'max_similarity': 0.0 } videos[video_id]['chunks'].append(chunk) videos[video_id]['max_similarity'] = max( videos[video_id]['max_similarity'], chunk.similarity ) # Sort by max similarity sorted_videos = sorted( videos.items(), key=lambda x: x[1]['max_similarity'], reverse=True ) # Format output output = [] for video_id, data in sorted_videos: similarity_pct = int(data['max_similarity'] * 100) output.append(f"### [{data['title']}]({data['url']})") output.append(f"**Relevance:** {similarity_pct}%\n") # Show top excerpt top_chunk = max(data['chunks'], key=lambda c: c.similarity) excerpt = top_chunk.chunk_text[:500] + "..." if len(top_chunk.chunk_text) > 500 else top_chunk.chunk_text output.append(f"> {excerpt}\n") return "\n".join(output) def format_context_for_llm(self, chunks: List[TranscriptChunk]) -> str: """ Format chunks as context for LLM script generation (Tab 2) Args: chunks: List of TranscriptChunk objects Returns: Formatted string with transcript excerpts for LLM context """ if not chunks: return "" formatted = [] for i, chunk in enumerate(chunks, 1): formatted.append(f"[Excerpt {i} - {chunk.title}]\n{chunk.chunk_text}") return "\n\n---\n\n".join(formatted) def create_vectorstore() -> TranscriptVectorStore: """Factory function to create and return a configured vector store""" return TranscriptVectorStore()