Spaces:
Sleeping
Sleeping
| import logging | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from flask import Blueprint, jsonify, request | |
| from flask_pydantic import validate | |
| from lpm_kernel.api.common.responses import APIResponse | |
| from lpm_kernel.configs.config import Config | |
| from lpm_kernel.file_data.chunker import DocumentChunker | |
| from lpm_kernel.file_data.document_service import document_service | |
| from lpm_kernel.kernel.chunk_service import ChunkService | |
| logger = logging.getLogger(__name__) | |
| document_bp = Blueprint("documents", __name__, url_prefix="/api") | |
| # Ensure .env file is loaded | |
| load_dotenv() | |
| def list_documents(): | |
| """ | |
| List all documents | |
| Query Parameters: | |
| include_l0 (bool): Whether to include L0 data (chunks and embeddings) | |
| """ | |
| try: | |
| # get query params | |
| include_l0 = request.args.get("include_l0", "").lower() == "true" | |
| if include_l0: | |
| documents = document_service.list_documents_with_l0() | |
| return jsonify(APIResponse.success(data=documents)) | |
| else: | |
| documents = document_service.list_documents() | |
| return jsonify( | |
| APIResponse.success(data=[doc.to_dict() for doc in documents]) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error listing documents: {str(e)}", exc_info=True) | |
| return jsonify(APIResponse.error(message=f"Error listing documents: {str(e)}")) | |
| def scan_documents(): | |
| """Scan documents from configured directory and store them in database""" | |
| try: | |
| # 2. Get project root directory and construct the full path | |
| config = Config.from_env() | |
| relative_path = config.get("USER_RAW_CONTENT_DIR").lstrip("/") | |
| project_root = Path(__file__).parent.parent.parent.parent.parent | |
| full_path = project_root / relative_path | |
| # 3. Scan and process files | |
| processed_doc_dtos = document_service.scan_directory( | |
| directory_path=str(full_path), recursive=True | |
| ) | |
| logger.info(f"Scan completed. Processed {len(processed_doc_dtos)} documents") | |
| # 4. Return processing results | |
| return jsonify( | |
| APIResponse.success(data=[doc_dto.dict() for doc_dto in processed_doc_dtos]) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error in scan_documents: {str(e)}", exc_info=True) | |
| return jsonify( | |
| APIResponse.error(message=f"Unexpected error in scan_documents: {str(e)}") | |
| ) | |
| def analyze_documents(): | |
| """Analyze all unanalyzed documents""" | |
| try: | |
| analyzed_doc_dtos = document_service.analyze_all_documents() | |
| return jsonify( | |
| APIResponse.success( | |
| data={ | |
| "total": len(analyzed_doc_dtos), | |
| "documents": [doc.dict() for doc in analyzed_doc_dtos], | |
| } | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error analyzing documents: {str(e)}", exc_info=True) | |
| return jsonify( | |
| APIResponse.error(message=f"Error analyzing documents: {str(e)}") | |
| ) | |
| def get_document_l0(document_id: int): | |
| """Get document L0 data including chunks and embeddings""" | |
| try: | |
| l0_data = document_service.get_document_l0(document_id) | |
| return jsonify(APIResponse.success(data=l0_data)) | |
| except Exception as e: | |
| logger.error(f"Error getting document L0 data: {str(e)}", exc_info=True) | |
| return jsonify( | |
| APIResponse.error(message=f"Error getting document L0 data: {str(e)}") | |
| ) | |
| def get_document_chunks(document_id: int): | |
| """Get chunks for the specified document""" | |
| try: | |
| logger.info(f"Attempting to retrieve chunks for document_id: {document_id}") | |
| chunks = document_service.get_document_chunks(document_id) | |
| if not chunks: | |
| logger.warning(f"No chunks found for document_id: {document_id}") | |
| return jsonify( | |
| APIResponse.error(message=f"No chunks found for document {document_id}") | |
| ) | |
| return jsonify( | |
| APIResponse.success( | |
| data={ | |
| "document_id": document_id, | |
| "total_chunks": len(chunks), | |
| "chunks": chunks, | |
| } | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Error getting document chunks for document_id {document_id}: {str(e)}", | |
| exc_info=True, | |
| ) | |
| return jsonify( | |
| APIResponse.error( | |
| message=f"Error getting document chunks for document_id {document_id}: {str(e)}" | |
| ) | |
| ) | |
| def process_all_chunks(): | |
| """Process chunks for all documents in batch""" | |
| try: | |
| config = Config.from_env() | |
| chunker = DocumentChunker( | |
| chunk_size=int(config.get("DOCUMENT_CHUNK_SIZE")), | |
| overlap=int(config.get("DOCUMENT_CHUNK_OVERLAP")), | |
| ) | |
| documents = document_service.list_documents() | |
| processed, failed = 0, 0 | |
| chunk_service = ChunkService() | |
| for doc in documents: | |
| try: | |
| if not doc.raw_content: | |
| logger.warning(f"Document {doc.id} has no content, skipping...") | |
| failed += 1 | |
| continue | |
| # Split into chunks and save | |
| chunks = chunker.split(doc.raw_content) | |
| for chunk in chunks: | |
| chunk.document_id = doc.id | |
| chunk_service.save_chunk(chunk) | |
| processed += 1 | |
| logger.info( | |
| f"Document {doc.id} processed: {len(chunks)} chunks created" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to process document {doc.id}: {str(e)}") | |
| failed += 1 | |
| return jsonify( | |
| APIResponse.success( | |
| data={ | |
| "total": len(documents), | |
| "processed": processed, | |
| "failed": failed, | |
| } | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Chunk processing failed: {str(e)}") | |
| return jsonify(APIResponse.error(message=f"Chunk processing failed: {str(e)}")) | |
| def process_document_embeddings(document_id: int): | |
| """Process embeddings for all chunks of the specified document""" | |
| try: | |
| # Call service to process embeddings | |
| processed_chunks = document_service.generate_document_chunk_embeddings( | |
| document_id | |
| ) | |
| if not processed_chunks: | |
| logger.warning(f"No chunks found for document {document_id}") | |
| return jsonify( | |
| APIResponse.error(message=f"No chunks found for document {document_id}") | |
| ) | |
| return jsonify( | |
| APIResponse.success( | |
| data={ | |
| "document_id": document_id, | |
| "total_chunks": len(processed_chunks), | |
| "processed_chunks": len( | |
| [c for c in processed_chunks if c.has_embedding] | |
| ), | |
| } | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Error processing embeddings for document {document_id}: {str(e)}", | |
| exc_info=True, | |
| ) | |
| return jsonify( | |
| APIResponse.error( | |
| message=f"Error processing embeddings for document {document_id}: {str(e)}" | |
| ) | |
| ) | |
| def get_document_embeddings(document_id: int): | |
| """Get embeddings status for all chunks of the specified document""" | |
| try: | |
| # Get query parameters, determine whether to return complete embedding vectors | |
| include_vectors = request.args.get("include_vectors", "").lower() == "true" | |
| chunks = document_service.get_document_chunks(document_id) | |
| if not chunks: | |
| return jsonify( | |
| APIResponse.error(message=f"No chunks found for document {document_id}") | |
| ) | |
| # Get embeddings from ChromaDB | |
| chunk_embeddings = document_service.get_chunk_embeddings_by_document_id( | |
| document_id | |
| ) | |
| chunks_info = [ | |
| { | |
| "id": chunk.id, | |
| "content": chunk.content[:100] + "..." | |
| if len(chunk.content) > 100 | |
| else chunk.content, | |
| "has_embedding": chunk.has_embedding, | |
| "embedding_length": len(chunk_embeddings.get(chunk.id, [])) | |
| if chunk_embeddings.get(chunk.id) | |
| else 0, | |
| "embedding_vector": chunk_embeddings.get(chunk.id) | |
| if include_vectors | |
| else None, # Decide whether to include vectors based on parameters | |
| "tags": chunk.tags, | |
| "topic": chunk.topic, | |
| } | |
| for chunk in chunks | |
| ] | |
| return jsonify( | |
| APIResponse.success( | |
| data={ | |
| "document_id": document_id, | |
| "total_chunks": len(chunks), | |
| "chunks_with_embeddings": len( | |
| [c for c in chunks if c.has_embedding] | |
| ), | |
| "chunks": chunks_info, | |
| } | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Error getting embeddings for document {document_id}: {str(e)}", | |
| exc_info=True, | |
| ) | |
| return jsonify( | |
| APIResponse.error( | |
| message=f"Error getting embeddings for document {document_id}: {str(e)}" | |
| ) | |
| ) | |
| def process_document_embedding(document_id: int): | |
| """Process document-level embedding""" | |
| try: | |
| embedding = document_service.process_document_embedding(document_id) | |
| if embedding is None: | |
| return jsonify( | |
| APIResponse.error( | |
| message=f"Failed to process embedding for document {document_id}" | |
| ) | |
| ) | |
| return jsonify( | |
| APIResponse.success( | |
| data={"document_id": document_id, "embedding_length": len(embedding)} | |
| ) | |
| ) | |
| except ValueError as e: | |
| logger.error(f"Document not found: {str(e)}") | |
| return jsonify(APIResponse.error(message=f"Document not found: {str(e)}")) | |
| except Exception as e: | |
| logger.error(f"Error processing document embedding: {str(e)}", exc_info=True) | |
| return jsonify( | |
| APIResponse.error(message=f"Error processing document embedding: {str(e)}") | |
| ) | |
| def get_document_embedding(document_id: int): | |
| """Get document-level embedding""" | |
| try: | |
| # Get query parameters, determine whether to return complete embedding vector | |
| include_vector = request.args.get("include_vector", "").lower() == "true" | |
| embedding = document_service.get_document_embedding(document_id) | |
| if embedding is None: | |
| return jsonify( | |
| APIResponse.error( | |
| message=f"No embedding found for document {document_id}" | |
| ) | |
| ), 404 | |
| return jsonify( | |
| APIResponse.success( | |
| data={ | |
| "document_id": document_id, | |
| "embedding_length": len(embedding), | |
| "embedding_vector": embedding if include_vector else None, | |
| } | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error getting document embedding: {str(e)}", exc_info=True) | |
| return jsonify( | |
| APIResponse.error(message=f"Error getting document embedding: {str(e)}") | |
| ) | |
| def verify_document_embeddings(): | |
| """Verify all document embeddings and return statistics""" | |
| try: | |
| verbose = request.args.get("verbose", "").lower() == "true" | |
| results = document_service.verify_document_embeddings(verbose=verbose) | |
| return jsonify(APIResponse.success(data=results)) | |
| except Exception as e: | |
| logger.error(f"Error verifying document embeddings: {str(e)}", exc_info=True) | |
| return jsonify(APIResponse.error(message=f"Error verifying document embeddings: {str(e)}")) | |
| def repair_documents(): | |
| """Repair documents with missing analysis and embeddings""" | |
| try: | |
| # First, fix missing document analysis (summaries and insights) | |
| fixed_analysis_count = document_service.fix_missing_document_analysis() | |
| # Get verification results after fixing analysis | |
| verification_results = document_service.verify_document_embeddings(verbose=False) | |
| # Process documents with missing embeddings | |
| documents_fixed = 0 | |
| for doc in document_service._repository.list(): | |
| embedding = document_service.get_document_embedding(doc.id) | |
| if doc.raw_content and embedding is None: | |
| try: | |
| document_service.process_document_embedding(doc.id) | |
| # Also process chunk embeddings | |
| document_service.generate_document_chunk_embeddings(doc.id) | |
| documents_fixed += 1 | |
| except Exception as e: | |
| logger.error(f"Error processing document {doc.id} embedding: {str(e)}") | |
| # Get final verification results | |
| final_results = document_service.verify_document_embeddings(verbose=False) | |
| return jsonify(APIResponse.success( | |
| data={ | |
| "analysis_fixed": fixed_analysis_count, | |
| "embeddings_fixed": documents_fixed, | |
| "initial_state": verification_results, | |
| "final_state": final_results | |
| } | |
| )) | |
| except Exception as e: | |
| logger.error(f"Error repairing documents: {str(e)}", exc_info=True) | |
| return jsonify(APIResponse.error(message=f"Error repairing documents: {str(e)}")) | |