import os import json import logging from typing import List, Dict, Any # Haystack imports from haystack.utils import Secret from haystack.components.generators.openai import OpenAIGenerator from haystack.components.embedders import OpenAITextEmbedder from ragas import EvaluationDataset, SingleTurnSample # Ragas imports from ragas.metrics import ( faithfulness, answer_relevancy, context_precision, context_recall, # context_relevancy ) from ragas.llms.haystack_wrapper import HaystackLLMWrapper from ragas.embeddings.haystack_wrapper import HaystackEmbeddingsWrapper from ragas import evaluate import pandas as pd # Import the existing RAG pipeline from rag_pipeline import RAGPipeline # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) class RAGEvaluator: def __init__( self, embedding_model_name: str = "BAAI/bge-en-icl", llm_model_name: str = "meta-llama/Llama-3.3-70B-Instruct", qdrant_path: str = "./qdrant_data", api_base_url: str = "https://api.studio.nebius.com/v1/", collection_name: str = "ltu_documents" ): self.embedding_model_name = embedding_model_name self.llm_model_name = llm_model_name self.qdrant_path = qdrant_path self.api_base_url = api_base_url self.collection_name = collection_name # Load API key from environment or use the one from testset_generation.py self.api_key = Secret.from_token(os.getenv("NEBIUS_API_KEY")) # Initialize the existing RAG pipeline self.init_components() def init_components(self): """Initialize the existing RAG pipeline and Ragas components""" logger.info("Initializing components...") # Initialize the existing RAG pipeline self.rag_pipeline = RAGPipeline( embedding_model_name=self.embedding_model_name, llm_model_name=self.llm_model_name, qdrant_path=self.qdrant_path ) # Initialize Ragas wrappers self.llm_wrapper = HaystackLLMWrapper( OpenAIGenerator( api_base_url="https://api.studio.nebius.com/v1/", model=self.llm_model_name, api_key=self.api_key, generation_kwargs={ "max_tokens": 1024, "temperature": 0.1, "top_p": 0.95, } ) ) self.embedding_wrapper = HaystackEmbeddingsWrapper( OpenAITextEmbedder( api_base_url="https://api.studio.nebius.com/v1/", model=self.embedding_model_name, api_key=self.api_key, ) ) logger.info("Components initialized successfully") def load_testset(self, testset_path: str) -> List[Dict[str, Any]]: """Load test set from a JSONL file""" logger.info(f"Loading test set from {testset_path}...") test_data = [] with open(testset_path, 'r', encoding='utf-8') as f: for line in f: test_data.append(json.loads(line)) logger.info(f"Loaded {len(test_data)} test samples") return test_data def prepare_ragas_dataframe(self, test_data: List[Dict[str, Any]], results: List[Dict[str, Any]]) -> pd.DataFrame: """Prepare dataframe for Ragas evaluation""" logger.info("Preparing data for Ragas evaluation...") eval_data = [] for _, (test_sample, result) in enumerate(zip(test_data, results)): question = test_sample["user_input"] reference_answer = test_sample["reference"] # Get generated answer and contexts from pipeline result generated_answer = result["answer"] contexts = [doc.content for doc in result["documents"]] # Get reference contexts reference_contexts = test_sample.get("reference_contexts", []) eval_data.append(SingleTurnSample( user_input=question, response=generated_answer, retrieved_contexts=contexts, reference=reference_answer, reference_contexts=reference_contexts )) # print(eval_data[0]) return EvaluationDataset(eval_data) def run_evaluation(self, testset_path: str = "testset.jsonl") -> Dict[str, float]: """Run the full evaluation process""" logger.info("Starting RAG pipeline evaluation...") # Load test set test_data = self.load_testset(testset_path) # Run pipeline for each test sample results = [] for i, test_sample in enumerate(test_data): logger.info(f"Processing test sample {i+1}/{len(test_data)}") question = test_sample["user_input"] # Run the existing RAG pipeline result = self.rag_pipeline.query(question) results.append(result) # Prepare data for Ragas eval_ds = self.prepare_ragas_dataframe(test_data, results) # Run Ragas evaluation logger.info("Running Ragas evaluation...") evaluation_result = evaluate( eval_ds, # metrics=[ # faithfulness, # answer_relevancy, # context_precision, # context_recall, # # context_relevancy # ], llm=self.llm_wrapper, embeddings=self.embedding_wrapper, # reference_answers=eval_df["reference_answer"].tolist(), # reference_contexts=eval_df["reference_contexts"].tolist() ) # Print and return results logger.info("Evaluation complete!") logger.info(f"Results: {evaluation_result}") return evaluation_result if __name__ == "__main__": # Create and run evaluator evaluator = RAGEvaluator() results = evaluator.run_evaluation() print(repr(results)) # Save results to file # with open("ragas_evaluation_results.json", "w") as f: # json.dump(results.to_dict(), f, indent=2) # print("\nEvaluation results saved to ragas_evaluation_results.json") # INFO:__main__:Results: { # 'answer_relevancy': 0.8558, # 'context_precision': 0.9033, # 'faithfulness': 0.8000, # 'context_recall': 0.9417 # } # {'answer_relevancy': 0.8558, 'context_precision': 0.9033, 'faithfulness': 0.8000, 'context_recall': 0.9417}