Spaces:
Sleeping
Sleeping
| # core/ingestion/ingestion_service.py | |
| import os | |
| from typing import List, Optional, Callable | |
| from utils.logger import logger | |
| from qdrant_client import QdrantClient | |
| from core.data_processing.text_processor import TextProcessor | |
| from core.data_processing.audio_processor import AudioProcessor | |
| from core.data_processing.image_processor import ImageProcessor | |
| from core.embeddings.text_embedding_model import TextEmbeddingModel | |
| from core.embeddings.image_embedding_model import ImageEmbeddingModel | |
| from core.embeddings.audio_embedding_model import AudioEmbeddingModel | |
| from core.retrieval.vector_db_manager import VectorDBManager | |
| class IngestionService: | |
| def __init__(self, client: QdrantClient): | |
| logger.info("Initializing IngestionService (Stateless)...") | |
| self.client = client | |
| self.text_processor = TextProcessor() | |
| self.image_processor = ImageProcessor() | |
| self.audio_processor = AudioProcessor() | |
| self.text_embedder = TextEmbeddingModel() | |
| self.image_embedder = ImageEmbeddingModel() | |
| self.audio_embedder = AudioEmbeddingModel() | |
| text_embedding_dim = self.text_embedder.model.get_sentence_embedding_dimension() | |
| self.text_db_manager = VectorDBManager( | |
| client=self.client, | |
| collection_name="text_collection", | |
| embedding_dim=text_embedding_dim | |
| ) | |
| image_embedding_dim = self.image_embedder.model.config.hidden_size | |
| self.image_vector_db_manager = VectorDBManager( | |
| client=self.client, | |
| collection_name="image_collection", | |
| embedding_dim=image_embedding_dim | |
| ) | |
| audio_embedding_dim = self.audio_embedder.model.config.projection_dim | |
| self.audio_vector_db_manager = VectorDBManager( | |
| client=self.client, | |
| collection_name="audio_collection", | |
| embedding_dim=audio_embedding_dim | |
| ) | |
| logger.info("IngestionService initialized successfully.") | |
| def ingest_files(self, file_paths: List[str]): | |
| '''Ingest files without displaying progress bar''' | |
| return self.ingest_files_with_progress(file_paths, None) | |
| def ingest_files_with_progress(self, file_paths: List[str], progress_callback: Optional[Callable] = None): | |
| """ | |
| Turn on progress bar for tracking | |
| """ | |
| logger.info(f"Starting ingestion for {len(file_paths)} files...") | |
| # Kiểm tra và xử lý progress_callback an toàn | |
| def safe_progress(value, desc=""): | |
| try: | |
| if progress_callback is not None: | |
| progress_callback(value, desc=desc) | |
| except Exception as e: | |
| logger.warning(f"Progress callback error: {e}") | |
| safe_progress(0.4, desc="Starting file processing...") | |
| all_chunks_to_process = [] | |
| # 1. Walk through files to split chunks | |
| for i, file_path in enumerate(file_paths): | |
| try: | |
| base_progress = 0.4 + (i / len(file_paths)) * 0.3 # 40% -> 70% | |
| file_name = os.path.basename(file_path) | |
| safe_progress(base_progress, desc=f"Processing file {i+1}/{len(file_paths)}: {file_name}") | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| chunks = [] | |
| safe_progress(base_progress + 0.01, desc=f"Reading {file_name}...") | |
| if file_ext in ['.txt']: | |
| chunks = self.text_processor.process(file_path) | |
| elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.gif']: | |
| chunks = self.image_processor.process(file_path) | |
| elif file_ext in ['.wav', '.mp3']: | |
| chunks = self.audio_processor.process(file_path) | |
| else: | |
| logger.warning(f"Unsupported file type '{file_ext}' for file: {file_path}. Skipping.") | |
| continue | |
| # Kiểm tra chunks có hợp lệ không | |
| if not chunks or len(chunks) == 0: | |
| logger.warning(f"No chunks generated from file: {file_path}") | |
| continue | |
| safe_progress(base_progress + 0.02, desc=f"Generated {len(chunks)} chunks from {file_name}") | |
| all_chunks_to_process.extend(chunks) | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_path}: {e}") | |
| continue | |
| if not all_chunks_to_process: | |
| logger.warning("No processable chunks were generated from the provided files.") | |
| safe_progress(1.0, desc="No chunks to process") | |
| return | |
| logger.info(f"Generated {len(all_chunks_to_process)} total chunks. Now generating embeddings...") | |
| safe_progress(0.7, desc=f"Generated {len(all_chunks_to_process)} chunks. Starting embeddings...") | |
| # 2. Create embeddings and add to batch | |
| text_embeddings_batch, text_metadatas_batch = [], [] | |
| audio_embeddings_batch, audio_metadatas_batch = [], [] | |
| image_embeddings_batch, image_metadatas_batch = [], [] | |
| BATCH_SIZE = 32 | |
| for i, chunk_data in enumerate(all_chunks_to_process): | |
| try: | |
| base_progress = 0.7 + (i / len(all_chunks_to_process)) * 0.25 # 70% -> 95% | |
| # Kiểm tra chunk_data có hợp lệ không | |
| if not chunk_data or 'metadata' not in chunk_data or 'content' not in chunk_data: | |
| logger.warning(f"Invalid chunk data at index {i}, skipping...") | |
| continue | |
| chunk_type = chunk_data['metadata'].get('type', 'unknown') | |
| content = chunk_data['content'] | |
| chunk_id = chunk_data['metadata'].get('chunk_id', f'chunk_{i}') | |
| # Kiểm tra content có hợp lệ không | |
| if not content: | |
| logger.warning(f"Empty content for chunk {chunk_id}, skipping...") | |
| continue | |
| safe_progress(base_progress, desc=f"Processing chunk {i+1}/{len(all_chunks_to_process)} ({chunk_type})") | |
| if chunk_type == "text": | |
| safe_progress(base_progress + 0.001, desc=f"Creating text embedding for chunk {i+1}") | |
| embeddings = self.text_embedder.get_embeddings([content]) | |
| if embeddings and len(embeddings) > 0: | |
| text_embeddings_batch.append(embeddings[0]) | |
| text_metadatas_batch.append(chunk_data) | |
| else: | |
| logger.warning(f"Failed to generate text embedding for chunk {chunk_id}") | |
| elif chunk_type == "audio": | |
| safe_progress(base_progress + 0.001, desc=f"Creating audio embedding for chunk {i+1}") | |
| embeddings = self.audio_embedder.get_embeddings([content]) | |
| if embeddings and len(embeddings) > 0: | |
| audio_embeddings_batch.append(embeddings[0]) | |
| audio_metadatas_batch.append(chunk_data) | |
| else: | |
| logger.warning(f"Failed to generate audio embedding for chunk {chunk_id}") | |
| elif chunk_type == "image": | |
| safe_progress(base_progress + 0.001, desc=f"Creating image embedding for chunk {i+1}") | |
| embeddings = self.image_embedder.get_embeddings([content]) | |
| if embeddings and len(embeddings) > 0: | |
| image_embeddings_batch.append(embeddings[0]) | |
| image_metadatas_batch.append(chunk_data) | |
| else: | |
| logger.warning(f"Failed to generate image embedding for chunk {chunk_id}") | |
| # add batch when reaching BATCH_SIZE | |
| if len(text_embeddings_batch) >= BATCH_SIZE: | |
| safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(text_embeddings_batch)} text embeddings...") | |
| self.text_db_manager.add_vectors(text_embeddings_batch, text_metadatas_batch) | |
| text_embeddings_batch, text_metadatas_batch = [], [] | |
| if len(audio_embeddings_batch) >= BATCH_SIZE: | |
| safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(audio_embeddings_batch)} audio embeddings...") | |
| self.audio_vector_db_manager.add_vectors(audio_embeddings_batch, audio_metadatas_batch) | |
| audio_embeddings_batch, audio_metadatas_batch = [], [] | |
| if len(image_embeddings_batch) >= BATCH_SIZE: | |
| safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(image_embeddings_batch)} image embeddings...") | |
| self.image_vector_db_manager.add_vectors(image_embeddings_batch, image_metadatas_batch) | |
| image_embeddings_batch, image_metadatas_batch = [], [] | |
| except Exception as e: | |
| logger.error(f"Error ingesting chunk {i}: {e}") | |
| continue | |
| safe_progress(0.95, desc="Saving final batches...") | |
| # adding maintaining embeddings | |
| final_operations = [] | |
| if text_embeddings_batch: | |
| final_operations.append(("text", len(text_embeddings_batch))) | |
| if audio_embeddings_batch: | |
| final_operations.append(("audio", len(audio_embeddings_batch))) | |
| if image_embeddings_batch: | |
| final_operations.append(("image", len(image_embeddings_batch))) | |
| # Tránh chia cho 0 | |
| total_operations = len(final_operations) | |
| if total_operations == 0: | |
| safe_progress(1.0, desc="No final batches to save") | |
| else: | |
| for i, (batch_type, count) in enumerate(final_operations): | |
| try: | |
| current_progress = 0.95 + (i / total_operations) * 0.04 # 95% -> 99% | |
| if batch_type == "text" and text_embeddings_batch: | |
| safe_progress(current_progress, desc=f"Saving final {count} text embeddings...") | |
| self.text_db_manager.add_vectors(text_embeddings_batch, text_metadatas_batch) | |
| elif batch_type == "audio" and audio_embeddings_batch: | |
| safe_progress(current_progress, desc=f"Saving final {count} audio embeddings...") | |
| self.audio_vector_db_manager.add_vectors(audio_embeddings_batch, audio_metadatas_batch) | |
| elif batch_type == "image" and image_embeddings_batch: | |
| safe_progress(current_progress, desc=f"Saving final {count} image embeddings...") | |
| self.image_vector_db_manager.add_vectors(image_embeddings_batch, image_metadatas_batch) | |
| except Exception as e: | |
| logger.error(f"Error saving final batch {batch_type}: {e}") | |
| safe_progress(1.0, desc=f"✅ Successfully ingested {len(file_paths)} files with {len(all_chunks_to_process)} chunks!") | |
| logger.success(f"Successfully completed ingestion for {len(file_paths)} files.") |