DeepSolanaCoder
/
venv
/lib
/python3.12
/site-packages
/langchain
/smith
/evaluation
/runner_utils.py
| """Utilities for running language models or Chains over datasets.""" | |
| from __future__ import annotations | |
| import concurrent.futures | |
| import dataclasses | |
| import functools | |
| import inspect | |
| import logging | |
| import uuid | |
| from datetime import datetime, timezone | |
| from typing import ( | |
| TYPE_CHECKING, | |
| Any, | |
| Callable, | |
| Dict, | |
| List, | |
| Optional, | |
| Tuple, | |
| Union, | |
| cast, | |
| ) | |
| from langchain_core._api import warn_deprecated | |
| from langchain_core.callbacks.manager import Callbacks | |
| from langchain_core.language_models import BaseLanguageModel | |
| from langchain_core.messages import BaseMessage, messages_from_dict | |
| from langchain_core.outputs import ChatResult, LLMResult | |
| from langchain_core.runnables import Runnable, RunnableConfig, RunnableLambda | |
| from langchain_core.runnables import config as runnable_config | |
| from langchain_core.runnables import utils as runnable_utils | |
| from langchain_core.tracers.evaluation import ( | |
| EvaluatorCallbackHandler, | |
| wait_for_all_evaluators, | |
| ) | |
| from langchain_core.tracers.langchain import LangChainTracer | |
| from langsmith.client import Client | |
| from langsmith.env import get_git_info, get_langchain_env_var_metadata | |
| from langsmith.evaluation import ( | |
| EvaluationResult, | |
| RunEvaluator, | |
| ) | |
| from langsmith.evaluation import ( | |
| run_evaluator as run_evaluator_dec, | |
| ) | |
| from langsmith.run_helpers import as_runnable, is_traceable_function | |
| from langsmith.schemas import Dataset, DataType, Example, Run, TracerSession | |
| from langsmith.utils import LangSmithError | |
| from requests import HTTPError | |
| from typing_extensions import TypedDict | |
| from langchain.chains.base import Chain | |
| from langchain.evaluation.loading import load_evaluator | |
| from langchain.evaluation.schema import ( | |
| EvaluatorType, | |
| PairwiseStringEvaluator, | |
| StringEvaluator, | |
| ) | |
| from langchain.smith import evaluation as smith_eval | |
| from langchain.smith.evaluation import config as smith_eval_config | |
| from langchain.smith.evaluation import name_generation, progress | |
| if TYPE_CHECKING: | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| MODEL_OR_CHAIN_FACTORY = Union[ | |
| Callable[[], Union[Chain, Runnable]], | |
| BaseLanguageModel, | |
| Callable[[dict], Any], | |
| Runnable, | |
| Chain, | |
| ] | |
| MCF = Union[Callable[[], Union[Chain, Runnable]], BaseLanguageModel] | |
| class InputFormatError(Exception): | |
| """Raised when the input format is invalid.""" | |
| ## Shared Utilities | |
| class TestResult(dict): | |
| """A dictionary of the results of a single test run.""" | |
| def get_aggregate_feedback( | |
| self, | |
| ) -> pd.DataFrame: | |
| """Return quantiles for the feedback scores. | |
| This method calculates and prints the quantiles for the feedback scores | |
| across all feedback keys. | |
| Returns: | |
| A DataFrame containing the quantiles for each feedback key. | |
| """ | |
| df = self.to_dataframe() | |
| # Drop all things starting with inputs., outputs., and reference | |
| to_drop = [ | |
| col | |
| for col in df.columns | |
| if col.startswith("inputs.") | |
| or col.startswith("outputs.") | |
| or col in {"input", "output"} | |
| or col.startswith("reference") | |
| ] | |
| return df.describe(include="all").drop(to_drop, axis=1) | |
| def to_dataframe(self) -> pd.DataFrame: | |
| """Convert the results to a dataframe.""" | |
| try: | |
| import pandas as pd | |
| except ImportError as e: | |
| raise ImportError( | |
| "Pandas is required to convert the results to a dataframe." | |
| " to install pandas, run `pip install pandas`." | |
| ) from e | |
| indices = [] | |
| records = [] | |
| for example_id, result in self["results"].items(): | |
| feedback = result["feedback"] | |
| output_ = result.get("output") | |
| if isinstance(output_, dict): | |
| output = {f"outputs.{k}": v for k, v in output_.items()} | |
| elif output_ is None: | |
| output = {} | |
| else: | |
| output = {"output": output_} | |
| r = { | |
| **{f"inputs.{k}": v for k, v in result["input"].items()}, | |
| **output, | |
| } | |
| if "reference" in result: | |
| if isinstance(result["reference"], dict): | |
| r.update( | |
| {f"reference.{k}": v for k, v in result["reference"].items()} | |
| ) | |
| else: | |
| r["reference"] = result["reference"] | |
| r.update( | |
| { | |
| **{f"feedback.{f.key}": f.score for f in feedback}, | |
| "error": result.get("Error"), | |
| "execution_time": result["execution_time"], | |
| "run_id": result.get("run_id"), | |
| } | |
| ) | |
| records.append(r) | |
| indices.append(example_id) | |
| return pd.DataFrame(records, index=indices) | |
| class EvalError(dict): | |
| """Your architecture raised an error.""" | |
| def __init__(self, Error: BaseException, **kwargs: Any) -> None: | |
| super().__init__(Error=Error, **kwargs) | |
| def __getattr__(self, name: str) -> Any: | |
| try: | |
| return self[name] | |
| except KeyError: | |
| raise AttributeError(f"'EvalError' object has no attribute '{name}'") | |
| def _wrap_in_chain_factory( | |
| llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, | |
| dataset_name: str = "<my_dataset>", | |
| ) -> MCF: | |
| """Forgive the user if they pass in a chain without memory instead of a chain | |
| factory. It's a common mistake. Raise a more helpful error message as well.""" | |
| if isinstance(llm_or_chain_factory, Chain): | |
| chain = llm_or_chain_factory | |
| chain_class = chain.__class__.__name__ | |
| if llm_or_chain_factory.memory is not None: | |
| memory_class = chain.memory.__class__.__name__ | |
| raise ValueError( | |
| "Cannot directly evaluate a chain with stateful memory." | |
| " To evaluate this chain, pass in a chain constructor" | |
| " that initializes fresh memory each time it is called." | |
| " This will safegaurd against information" | |
| " leakage between dataset examples." | |
| "\nFor example:\n\n" | |
| "def chain_constructor():\n" | |
| f" new_memory = {memory_class}(...)\n" | |
| f" return {chain_class}" | |
| "(memory=new_memory, ...)\n\n" | |
| f'run_on_dataset("{dataset_name}", chain_constructor, ...)' | |
| ) | |
| return lambda: chain | |
| elif isinstance(llm_or_chain_factory, BaseLanguageModel): | |
| return llm_or_chain_factory | |
| elif isinstance(llm_or_chain_factory, Runnable): | |
| # Memory may exist here, but it's not elegant to check all those cases. | |
| lcf = llm_or_chain_factory | |
| return lambda: lcf | |
| elif callable(llm_or_chain_factory): | |
| if is_traceable_function(llm_or_chain_factory): | |
| runnable_ = as_runnable(cast(Callable, llm_or_chain_factory)) | |
| return lambda: runnable_ | |
| try: | |
| _model = llm_or_chain_factory() # type: ignore[call-arg] | |
| except TypeError: | |
| # It's an arbitrary function, wrap it in a RunnableLambda | |
| user_func = cast(Callable, llm_or_chain_factory) | |
| sig = inspect.signature(user_func) | |
| logger.info(f"Wrapping function {sig} as RunnableLambda.") | |
| wrapped = RunnableLambda(user_func) | |
| return lambda: wrapped | |
| constructor = cast(Callable, llm_or_chain_factory) | |
| if isinstance(_model, BaseLanguageModel): | |
| # It's not uncommon to do an LLM constructor instead of raw LLM, | |
| # so we'll unpack it for the user. | |
| return _model | |
| elif is_traceable_function(cast(Callable, _model)): | |
| runnable_ = as_runnable(cast(Callable, _model)) | |
| return lambda: runnable_ | |
| elif not isinstance(_model, Runnable): | |
| # This is unlikely to happen - a constructor for a model function | |
| return lambda: RunnableLambda(constructor) | |
| else: | |
| # Typical correct case | |
| return constructor | |
| return llm_or_chain_factory | |
| def _get_prompt(inputs: Dict[str, Any]) -> str: | |
| """Get prompt from inputs. | |
| Args: | |
| inputs: The input dictionary. | |
| Returns: | |
| A string prompt. | |
| Raises: | |
| InputFormatError: If the input format is invalid. | |
| """ | |
| if not inputs: | |
| raise InputFormatError("Inputs should not be empty.") | |
| prompts = [] | |
| if "prompt" in inputs: | |
| if not isinstance(inputs["prompt"], str): | |
| raise InputFormatError( | |
| "Expected string for 'prompt', got" | |
| f" {type(inputs['prompt']).__name__}" | |
| ) | |
| prompts = [inputs["prompt"]] | |
| elif "prompts" in inputs: | |
| if not isinstance(inputs["prompts"], list) or not all( | |
| isinstance(i, str) for i in inputs["prompts"] | |
| ): | |
| raise InputFormatError( | |
| "Expected list of strings for 'prompts'," | |
| f" got {type(inputs['prompts']).__name__}" | |
| ) | |
| prompts = inputs["prompts"] | |
| elif len(inputs) == 1: | |
| prompt_ = next(iter(inputs.values())) | |
| if isinstance(prompt_, str): | |
| prompts = [prompt_] | |
| elif isinstance(prompt_, list) and all(isinstance(i, str) for i in prompt_): | |
| prompts = prompt_ | |
| else: | |
| raise InputFormatError(f"LLM Run expects string prompt input. Got {inputs}") | |
| else: | |
| raise InputFormatError( | |
| f"LLM Run expects 'prompt' or 'prompts' in inputs. Got {inputs}" | |
| ) | |
| if len(prompts) == 1: | |
| return prompts[0] | |
| else: | |
| raise InputFormatError( | |
| f"LLM Run expects single prompt input. Got {len(prompts)} prompts." | |
| ) | |
| class ChatModelInput(TypedDict): | |
| """Input for a chat model. | |
| Parameters: | |
| messages: List of chat messages. | |
| """ | |
| messages: List[BaseMessage] | |
| def _get_messages(inputs: Dict[str, Any]) -> dict: | |
| """Get Chat Messages from inputs. | |
| Args: | |
| inputs: The input dictionary. | |
| Returns: | |
| A list of chat messages. | |
| Raises: | |
| InputFormatError: If the input format is invalid. | |
| """ | |
| if not inputs: | |
| raise InputFormatError("Inputs should not be empty.") | |
| input_copy = inputs.copy() | |
| if "messages" in inputs: | |
| input_copy["input"] = input_copy.pop("messages") | |
| elif len(inputs) == 1: | |
| input_copy["input"] = next(iter(inputs.values())) | |
| if "input" in input_copy: | |
| raw_messages = input_copy["input"] | |
| if isinstance(raw_messages, list) and all( | |
| isinstance(i, dict) for i in raw_messages | |
| ): | |
| raw_messages = [raw_messages] | |
| if len(raw_messages) == 1: | |
| input_copy["input"] = messages_from_dict(raw_messages[0]) | |
| else: | |
| raise InputFormatError( | |
| "Batch messages not supported. Please provide a" | |
| " single list of messages." | |
| ) | |
| return input_copy | |
| else: | |
| raise InputFormatError( | |
| f"Chat Run expects single List[dict] or List[List[dict]] 'messages'" | |
| f" input. Got {inputs}" | |
| ) | |
| ## Shared data validation utilities | |
| def _validate_example_inputs_for_language_model( | |
| first_example: Example, | |
| input_mapper: Optional[Callable[[Dict], Any]], | |
| ) -> None: | |
| if input_mapper: | |
| prompt_input = input_mapper(first_example.inputs) | |
| if not isinstance(prompt_input, str) and not ( | |
| isinstance(prompt_input, list) | |
| and all(isinstance(msg, BaseMessage) for msg in prompt_input) | |
| ): | |
| raise InputFormatError( | |
| "When using an input_mapper to prepare dataset example inputs" | |
| " for an LLM or chat model, the output must a single string or" | |
| " a list of chat messages." | |
| f"\nGot: {prompt_input} of type {type(prompt_input)}." | |
| ) | |
| else: | |
| try: | |
| _get_prompt(first_example.inputs) | |
| except InputFormatError: | |
| try: | |
| _get_messages(first_example.inputs) | |
| except InputFormatError: | |
| raise InputFormatError( | |
| "Example inputs do not match language model input format. " | |
| "Expected a dictionary with messages or a single prompt." | |
| f" Got: {first_example.inputs}" | |
| " Please update your dataset OR provide an input_mapper" | |
| " to convert the example.inputs to a compatible format" | |
| " for the llm or chat model you wish to evaluate." | |
| ) | |
| def _validate_example_inputs_for_chain( | |
| first_example: Example, | |
| chain: Chain, | |
| input_mapper: Optional[Callable[[Dict], Any]], | |
| ) -> None: | |
| """Validate that the example inputs match the chain input keys.""" | |
| if input_mapper: | |
| first_inputs = input_mapper(first_example.inputs) | |
| missing_keys = set(chain.input_keys).difference(first_inputs) | |
| if not isinstance(first_inputs, dict): | |
| raise InputFormatError( | |
| "When using an input_mapper to prepare dataset example" | |
| " inputs for a chain, the mapped value must be a dictionary." | |
| f"\nGot: {first_inputs} of type {type(first_inputs)}." | |
| ) | |
| if missing_keys: | |
| raise InputFormatError( | |
| "Missing keys after loading example using input_mapper." | |
| f"\nExpected: {chain.input_keys}. Got: {first_inputs.keys()}" | |
| ) | |
| else: | |
| first_inputs = first_example.inputs | |
| missing_keys = set(chain.input_keys).difference(first_inputs) | |
| if len(first_inputs) == 1 and len(chain.input_keys) == 1: | |
| # We can pass this through the run method. | |
| # Refrain from calling to validate. | |
| pass | |
| elif missing_keys: | |
| raise InputFormatError( | |
| "Example inputs missing expected chain input keys." | |
| " Please provide an input_mapper to convert the example.inputs" | |
| " to a compatible format for the chain you wish to evaluate." | |
| f"Expected: {chain.input_keys}. " | |
| f"Got: {first_inputs.keys()}" | |
| ) | |
| def _validate_example_inputs( | |
| example: Example, | |
| llm_or_chain_factory: MCF, | |
| input_mapper: Optional[Callable[[Dict], Any]], | |
| ) -> None: | |
| """Validate that the example inputs are valid for the model.""" | |
| if isinstance(llm_or_chain_factory, BaseLanguageModel): | |
| _validate_example_inputs_for_language_model(example, input_mapper) | |
| else: | |
| chain = llm_or_chain_factory() | |
| if isinstance(chain, Chain): | |
| # Otherwise it's a runnable | |
| _validate_example_inputs_for_chain(example, chain, input_mapper) | |
| elif isinstance(chain, Runnable): | |
| logger.debug(f"Skipping input validation for {chain}") | |
| ## Shared Evaluator Setup Utilities | |
| def _setup_evaluation( | |
| llm_or_chain_factory: MCF, | |
| examples: List[Example], | |
| evaluation: Optional[smith_eval.RunEvalConfig], | |
| data_type: DataType, | |
| ) -> Optional[List[RunEvaluator]]: | |
| """Configure the evaluators to run on the results of the chain.""" | |
| if evaluation: | |
| if isinstance(llm_or_chain_factory, BaseLanguageModel): | |
| run_inputs, run_outputs = None, None | |
| run_type = "llm" | |
| else: | |
| run_type = "chain" | |
| chain = llm_or_chain_factory() | |
| run_inputs = chain.input_keys if isinstance(chain, Chain) else None | |
| run_outputs = chain.output_keys if isinstance(chain, Chain) else None | |
| run_evaluators = _load_run_evaluators( | |
| evaluation, | |
| run_type, | |
| data_type, | |
| list(examples[0].outputs) if examples[0].outputs else None, | |
| run_inputs, | |
| run_outputs, | |
| ) | |
| else: | |
| # TODO: Create a default helpfulness evaluator | |
| run_evaluators = None | |
| return run_evaluators | |
| def _determine_input_key( | |
| config: smith_eval.RunEvalConfig, | |
| run_inputs: Optional[List[str]], | |
| ) -> Optional[str]: | |
| input_key = None | |
| if config.input_key: | |
| input_key = config.input_key | |
| if run_inputs and input_key not in run_inputs: | |
| logger.warning( | |
| f"Input key {input_key} not in chain's specified" | |
| f" input keys {run_inputs}. Evaluation behavior may be undefined." | |
| ) | |
| elif run_inputs and len(run_inputs) == 1: | |
| input_key = run_inputs[0] | |
| elif run_inputs is not None and len(run_inputs) > 1: | |
| logger.warning( | |
| f"Chain expects multiple input keys: {run_inputs}," | |
| f" Evaluator is likely to fail. Evaluation behavior may be undefined." | |
| " Specify an input_key in the RunEvalConfig to avoid this warning." | |
| ) | |
| return input_key | |
| def _determine_prediction_key( | |
| config: smith_eval.RunEvalConfig, | |
| run_outputs: Optional[List[str]], | |
| ) -> Optional[str]: | |
| prediction_key = None | |
| if config.prediction_key: | |
| prediction_key = config.prediction_key | |
| if run_outputs and prediction_key not in run_outputs: | |
| logger.warning( | |
| f"Prediction key {prediction_key} not in chain's specified" | |
| f" output keys {run_outputs}. Evaluation behavior may be undefined." | |
| ) | |
| elif run_outputs and len(run_outputs) == 1: | |
| prediction_key = run_outputs[0] | |
| elif run_outputs is not None and len(run_outputs) > 1: | |
| logger.warning( | |
| f"Chain expects multiple output keys: {run_outputs}," | |
| f" Evaluation behavior may be undefined. Specify a prediction_key" | |
| " in the RunEvalConfig to avoid this warning." | |
| ) | |
| return prediction_key | |
| def _determine_reference_key( | |
| config: smith_eval.RunEvalConfig, | |
| example_outputs: Optional[List[str]], | |
| ) -> Optional[str]: | |
| if config.reference_key: | |
| reference_key = config.reference_key | |
| if example_outputs and reference_key not in example_outputs: | |
| raise ValueError( | |
| f"Reference key {reference_key} not in Dataset" | |
| f" example outputs: {example_outputs}" | |
| ) | |
| elif example_outputs and len(example_outputs) == 1: | |
| reference_key = list(example_outputs)[0] | |
| else: | |
| reference_key = None | |
| return reference_key | |
| def _construct_run_evaluator( | |
| eval_config: Union[ | |
| smith_eval_config.SINGLE_EVAL_CONFIG_TYPE, | |
| smith_eval_config.CUSTOM_EVALUATOR_TYPE, | |
| ], | |
| eval_llm: Optional[BaseLanguageModel], | |
| run_type: str, | |
| data_type: DataType, | |
| example_outputs: Optional[List[str]], | |
| reference_key: Optional[str], | |
| input_key: Optional[str], | |
| prediction_key: Optional[str], | |
| ) -> RunEvaluator: | |
| if isinstance(eval_config, RunEvaluator): | |
| return eval_config | |
| if isinstance(eval_config, (EvaluatorType, str)): | |
| if not isinstance(eval_config, EvaluatorType): | |
| eval_config = EvaluatorType(eval_config) | |
| evaluator_ = load_evaluator(eval_config, llm=eval_llm) | |
| eval_type_tag = eval_config.value | |
| elif isinstance(eval_config, smith_eval_config.EvalConfig): | |
| kwargs = {"llm": eval_llm, **eval_config.get_kwargs()} | |
| evaluator_ = load_evaluator(eval_config.evaluator_type, **kwargs) | |
| eval_type_tag = eval_config.evaluator_type.value | |
| # Override keys if specified in the config | |
| if isinstance(eval_config, smith_eval_config.SingleKeyEvalConfig): | |
| input_key = eval_config.input_key or input_key | |
| prediction_key = eval_config.prediction_key or prediction_key | |
| reference_key = eval_config.reference_key or reference_key | |
| elif callable(eval_config): | |
| # Assume we can decorate | |
| return run_evaluator_dec(eval_config) | |
| else: | |
| raise ValueError(f"Unknown evaluator type: {type(eval_config)}") | |
| if isinstance(evaluator_, StringEvaluator): | |
| if evaluator_.requires_reference and reference_key is None: | |
| raise ValueError( | |
| f"Must specify reference_key in smith_eval.RunEvalConfig to use" | |
| f" evaluator of type {eval_type_tag} with" | |
| f" dataset with multiple output keys: {example_outputs}." | |
| ) | |
| run_evaluator = smith_eval.StringRunEvaluatorChain.from_run_and_data_type( | |
| evaluator_, | |
| run_type, | |
| data_type, | |
| input_key=input_key, | |
| prediction_key=prediction_key, | |
| reference_key=reference_key, | |
| tags=[eval_type_tag], | |
| ) | |
| elif isinstance(evaluator_, PairwiseStringEvaluator): | |
| raise NotImplementedError( | |
| f"Run evaluator for {eval_type_tag} is not implemented." | |
| " PairwiseStringEvaluators compare the outputs of two different models" | |
| " rather than the output of a single model." | |
| " Did you mean to use a StringEvaluator instead?" | |
| "\nSee: https://python.langchain.com/docs/guides/evaluation/string/" | |
| ) | |
| else: | |
| raise NotImplementedError( | |
| f"Run evaluator for {eval_type_tag} is not implemented" | |
| ) | |
| return run_evaluator | |
| def _get_keys( | |
| config: smith_eval.RunEvalConfig, | |
| run_inputs: Optional[List[str]], | |
| run_outputs: Optional[List[str]], | |
| example_outputs: Optional[List[str]], | |
| ) -> Tuple[Optional[str], Optional[str], Optional[str]]: | |
| input_key = _determine_input_key(config, run_inputs) | |
| prediction_key = _determine_prediction_key(config, run_outputs) | |
| reference_key = _determine_reference_key(config, example_outputs) | |
| return input_key, prediction_key, reference_key | |
| def _load_run_evaluators( | |
| config: smith_eval.RunEvalConfig, | |
| run_type: str, | |
| data_type: DataType, | |
| example_outputs: Optional[List[str]], | |
| run_inputs: Optional[List[str]], | |
| run_outputs: Optional[List[str]], | |
| ) -> List[RunEvaluator]: | |
| """ | |
| Load run evaluators from a configuration. | |
| Args: | |
| config: Configuration for the run evaluators. | |
| Returns: | |
| A list of run evaluators. | |
| """ | |
| run_evaluators = [] | |
| input_key, prediction_key, reference_key = None, None, None | |
| if config.evaluators or ( | |
| config.custom_evaluators | |
| and any([isinstance(e, StringEvaluator) for e in config.custom_evaluators]) | |
| ): | |
| input_key, prediction_key, reference_key = _get_keys( | |
| config, run_inputs, run_outputs, example_outputs | |
| ) | |
| for eval_config in config.evaluators: | |
| run_evaluator = _construct_run_evaluator( | |
| eval_config, | |
| config.eval_llm, | |
| run_type, | |
| data_type, | |
| example_outputs, | |
| reference_key, | |
| input_key, | |
| prediction_key, | |
| ) | |
| run_evaluators.append(run_evaluator) | |
| custom_evaluators = config.custom_evaluators or [] | |
| for custom_evaluator in custom_evaluators: | |
| if isinstance(custom_evaluator, RunEvaluator): | |
| run_evaluators.append(custom_evaluator) | |
| elif isinstance(custom_evaluator, StringEvaluator): | |
| run_evaluators.append( | |
| smith_eval.StringRunEvaluatorChain.from_run_and_data_type( | |
| custom_evaluator, | |
| run_type, | |
| data_type, | |
| input_key=input_key, | |
| prediction_key=prediction_key, | |
| reference_key=reference_key, | |
| ) | |
| ) | |
| elif callable(custom_evaluator): | |
| run_evaluators.append(run_evaluator_dec(custom_evaluator)) | |
| else: | |
| raise ValueError( | |
| f"Unsupported custom evaluator: {custom_evaluator}." | |
| f" Expected RunEvaluator or StringEvaluator." | |
| ) | |
| return run_evaluators | |
| ### Async Helpers | |
| async def _arun_llm( | |
| llm: BaseLanguageModel, | |
| inputs: Dict[str, Any], | |
| *, | |
| tags: Optional[List[str]] = None, | |
| callbacks: Callbacks = None, | |
| input_mapper: Optional[Callable[[Dict], Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> Union[str, BaseMessage]: | |
| """Asynchronously run the language model. | |
| Args: | |
| llm: The language model to run. | |
| inputs: The input dictionary. | |
| tags: Optional tags to add to the run. | |
| callbacks: Optional callbacks to use during the run. | |
| input_mapper: Optional function to map inputs to the expected format. | |
| Returns: | |
| The LLMResult or ChatResult. | |
| Raises: | |
| ValueError: If the LLM type is unsupported. | |
| InputFormatError: If the input format is invalid. | |
| """ | |
| if input_mapper is not None: | |
| prompt_or_messages = input_mapper(inputs) | |
| if ( | |
| isinstance(prompt_or_messages, str) | |
| or isinstance(prompt_or_messages, list) | |
| and all(isinstance(msg, BaseMessage) for msg in prompt_or_messages) | |
| ): | |
| return await llm.ainvoke( | |
| prompt_or_messages, | |
| config=RunnableConfig( | |
| callbacks=callbacks, tags=tags or [], metadata=metadata or {} | |
| ), | |
| ) | |
| else: | |
| raise InputFormatError( | |
| "Input mapper returned invalid format" | |
| f" {prompt_or_messages}" | |
| "\nExpected a single string or list of chat messages." | |
| ) | |
| else: | |
| try: | |
| prompt = _get_prompt(inputs) | |
| llm_output: Union[str, BaseMessage] = await llm.ainvoke( | |
| prompt, | |
| config=RunnableConfig( | |
| callbacks=callbacks, tags=tags or [], metadata=metadata or {} | |
| ), | |
| ) | |
| except InputFormatError: | |
| llm_inputs = _get_messages(inputs) | |
| llm_output = await llm.ainvoke( | |
| **llm_inputs, | |
| config=RunnableConfig( | |
| callbacks=callbacks, tags=tags or [], metadata=metadata or {} | |
| ), | |
| ) | |
| return llm_output | |
| async def _arun_chain( | |
| chain: Union[Chain, Runnable], | |
| inputs: Dict[str, Any], | |
| callbacks: Callbacks, | |
| *, | |
| tags: Optional[List[str]] = None, | |
| input_mapper: Optional[Callable[[Dict], Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> Union[dict, str]: | |
| """Run a chain asynchronously on inputs.""" | |
| inputs_ = inputs if input_mapper is None else input_mapper(inputs) | |
| if ( | |
| isinstance(chain, Chain) | |
| and isinstance(inputs_, dict) | |
| and len(inputs_) == 1 | |
| and chain.input_keys | |
| ): | |
| val = next(iter(inputs_.values())) | |
| output = await chain.ainvoke( | |
| val, | |
| config=RunnableConfig( | |
| callbacks=callbacks, tags=tags or [], metadata=metadata or {} | |
| ), | |
| ) | |
| else: | |
| runnable_config = RunnableConfig( | |
| tags=tags or [], callbacks=callbacks, metadata=metadata or {} | |
| ) | |
| output = await chain.ainvoke(inputs_, config=runnable_config) | |
| return output | |
| async def _arun_llm_or_chain( | |
| example: Example, | |
| config: RunnableConfig, | |
| *, | |
| llm_or_chain_factory: MCF, | |
| input_mapper: Optional[Callable[[Dict], Any]] = None, | |
| ) -> Union[dict, str, LLMResult, ChatResult]: | |
| """Asynchronously run the Chain or language model. | |
| Args: | |
| example: The example to run. | |
| llm_or_chain_factory: The Chain or language model constructor to run. | |
| tags: Optional tags to add to the run. | |
| callbacks: Optional callbacks to use during the run. | |
| input_mapper: Optional function to map the input to the expected format. | |
| Returns: | |
| A list of outputs. | |
| """ | |
| chain_or_llm = ( | |
| "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" | |
| ) | |
| result = None | |
| try: | |
| if isinstance(llm_or_chain_factory, BaseLanguageModel): | |
| output: Any = await _arun_llm( | |
| llm_or_chain_factory, | |
| example.inputs, | |
| tags=config["tags"], | |
| callbacks=config["callbacks"], | |
| input_mapper=input_mapper, | |
| metadata=config.get("metadata"), | |
| ) | |
| else: | |
| chain = llm_or_chain_factory() | |
| output = await _arun_chain( | |
| chain, | |
| example.inputs, | |
| tags=config["tags"], | |
| callbacks=config["callbacks"], | |
| input_mapper=input_mapper, | |
| metadata=config.get("metadata"), | |
| ) | |
| result = output | |
| except Exception as e: | |
| logger.warning( | |
| f"{chain_or_llm} failed for example {example.id} " | |
| f"with inputs {example.inputs}" | |
| f"\n{repr(e)}" | |
| ) | |
| result = EvalError(Error=e) | |
| return result | |
| ## Sync Utilities | |
| def _run_llm( | |
| llm: BaseLanguageModel, | |
| inputs: Dict[str, Any], | |
| callbacks: Callbacks, | |
| *, | |
| tags: Optional[List[str]] = None, | |
| input_mapper: Optional[Callable[[Dict], Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> Union[str, BaseMessage]: | |
| """ | |
| Run the language model on the example. | |
| Args: | |
| llm: The language model to run. | |
| inputs: The input dictionary. | |
| callbacks: The callbacks to use during the run. | |
| tags: Optional tags to add to the run. | |
| input_mapper: function to map to the inputs dictionary from an Example | |
| Returns: | |
| The LLMResult or ChatResult. | |
| Raises: | |
| ValueError: If the LLM type is unsupported. | |
| InputFormatError: If the input format is invalid. | |
| """ | |
| # Most of this is legacy code; we could probably remove a lot of it. | |
| if input_mapper is not None: | |
| prompt_or_messages = input_mapper(inputs) | |
| if ( | |
| isinstance(prompt_or_messages, str) | |
| or isinstance(prompt_or_messages, list) | |
| and all(isinstance(msg, BaseMessage) for msg in prompt_or_messages) | |
| ): | |
| llm_output: Union[str, BaseMessage] = llm.invoke( | |
| prompt_or_messages, | |
| config=RunnableConfig( | |
| callbacks=callbacks, tags=tags or [], metadata=metadata or {} | |
| ), | |
| ) | |
| else: | |
| raise InputFormatError( | |
| "Input mapper returned invalid format: " | |
| f" {prompt_or_messages}" | |
| "\nExpected a single string or list of chat messages." | |
| ) | |
| else: | |
| try: | |
| llm_prompts = _get_prompt(inputs) | |
| llm_output = llm.invoke( | |
| llm_prompts, | |
| config=RunnableConfig( | |
| callbacks=callbacks, tags=tags or [], metadata=metadata or {} | |
| ), | |
| ) | |
| except InputFormatError: | |
| llm_inputs = _get_messages(inputs) | |
| llm_output = llm.invoke( | |
| **llm_inputs, | |
| config=RunnableConfig(callbacks=callbacks, metadata=metadata or {}), | |
| ) | |
| return llm_output | |
| def _run_chain( | |
| chain: Union[Chain, Runnable], | |
| inputs: Dict[str, Any], | |
| callbacks: Callbacks, | |
| *, | |
| tags: Optional[List[str]] = None, | |
| input_mapper: Optional[Callable[[Dict], Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> Union[Dict, str]: | |
| """Run a chain on inputs.""" | |
| inputs_ = inputs if input_mapper is None else input_mapper(inputs) | |
| if ( | |
| isinstance(chain, Chain) | |
| and isinstance(inputs_, dict) | |
| and len(inputs_) == 1 | |
| and chain.input_keys | |
| ): | |
| val = next(iter(inputs_.values())) | |
| output = chain.invoke( | |
| val, | |
| config=RunnableConfig( | |
| callbacks=callbacks, tags=tags or [], metadata=metadata or {} | |
| ), | |
| ) | |
| else: | |
| runnable_config = RunnableConfig( | |
| tags=tags or [], callbacks=callbacks, metadata=metadata or {} | |
| ) | |
| output = chain.invoke(inputs_, config=runnable_config) | |
| return output | |
| def _run_llm_or_chain( | |
| example: Example, | |
| config: RunnableConfig, | |
| *, | |
| llm_or_chain_factory: MCF, | |
| input_mapper: Optional[Callable[[Dict], Any]] = None, | |
| ) -> Union[dict, str, LLMResult, ChatResult]: | |
| """ | |
| Run the Chain or language model synchronously. | |
| Args: | |
| example: The example to run. | |
| llm_or_chain_factory: The Chain or language model constructor to run. | |
| tags: Optional tags to add to the run. | |
| callbacks: Optional callbacks to use during the run. | |
| Returns: | |
| Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: | |
| The outputs of the model or chain. | |
| """ | |
| chain_or_llm = ( | |
| "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" | |
| ) | |
| result = None | |
| try: | |
| if isinstance(llm_or_chain_factory, BaseLanguageModel): | |
| output: Any = _run_llm( | |
| llm_or_chain_factory, | |
| example.inputs, | |
| config["callbacks"], | |
| tags=config["tags"], | |
| input_mapper=input_mapper, | |
| metadata=config.get("metadata"), | |
| ) | |
| else: | |
| chain = llm_or_chain_factory() | |
| output = _run_chain( | |
| chain, | |
| example.inputs, | |
| config["callbacks"], | |
| tags=config["tags"], | |
| input_mapper=input_mapper, | |
| metadata=config.get("metadata"), | |
| ) | |
| result = output | |
| except Exception as e: | |
| error_type = type(e).__name__ | |
| logger.warning( | |
| f"{chain_or_llm} failed for example {example.id} " | |
| f"with inputs {example.inputs}" | |
| f"\nError Type: {error_type}, Message: {e}" | |
| ) | |
| result = EvalError(Error=e) | |
| return result | |
| def _prepare_eval_run( | |
| client: Client, | |
| dataset_name: str, | |
| llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, | |
| project_name: str, | |
| project_metadata: Optional[Dict[str, Any]] = None, | |
| tags: Optional[List[str]] = None, | |
| dataset_version: Optional[Union[str, datetime]] = None, | |
| ) -> Tuple[MCF, TracerSession, Dataset, List[Example]]: | |
| wrapped_model = _wrap_in_chain_factory(llm_or_chain_factory, dataset_name) | |
| dataset = client.read_dataset(dataset_name=dataset_name) | |
| examples = list(client.list_examples(dataset_id=dataset.id, as_of=dataset_version)) | |
| if not examples: | |
| raise ValueError(f"Dataset {dataset_name} has no example rows.") | |
| modified_at = [ex.modified_at for ex in examples if ex.modified_at] | |
| # Should always be defined in practice when fetched, | |
| # but the typing permits None | |
| max_modified_at = max(modified_at) if modified_at else None | |
| inferred_version = max_modified_at.isoformat() if max_modified_at else None | |
| try: | |
| project_metadata = project_metadata or {} | |
| git_info = get_git_info() | |
| if git_info: | |
| project_metadata = { | |
| **project_metadata, | |
| "git": git_info, | |
| } | |
| project_metadata["dataset_version"] = inferred_version | |
| project = client.create_project( | |
| project_name, | |
| reference_dataset_id=dataset.id, | |
| project_extra={"tags": tags} if tags else {}, | |
| metadata=project_metadata, | |
| ) | |
| except (HTTPError, ValueError, LangSmithError) as e: | |
| if "already exists " not in str(e): | |
| raise e | |
| uid = uuid.uuid4() | |
| example_msg = f""" | |
| run_on_dataset( | |
| ... | |
| project_name="{project_name} - {uid}", # Update since {project_name} already exists | |
| ) | |
| """ | |
| raise ValueError( | |
| f"Test project {project_name} already exists. Please use a different name:" | |
| f"\n\n{example_msg}" | |
| ) | |
| comparison_url = dataset.url + f"/compare?selectedSessions={project.id}" | |
| print( # noqa: T201 | |
| f"View the evaluation results for project '{project_name}'" | |
| f" at:\n{comparison_url}\n\n" | |
| f"View all tests for Dataset {dataset_name} at:\n{dataset.url}", | |
| flush=True, | |
| ) | |
| return wrapped_model, project, dataset, examples | |
| class _RowResult(TypedDict, total=False): | |
| """A dictionary of the results for a single example row.""" | |
| feedback: Optional[List[EvaluationResult]] | |
| execution_time: Optional[float] | |
| run_id: Optional[str] | |
| class _DatasetRunContainer: | |
| """A container to help manage the state of a eval run.""" | |
| client: Client | |
| project: TracerSession | |
| wrapped_model: MCF | |
| examples: List[Example] | |
| configs: List[RunnableConfig] | |
| batch_evaluators: Optional[List[smith_eval_config.BATCH_EVALUATOR_LIKE]] = None | |
| def _merge_test_outputs( | |
| self, | |
| batch_results: list, | |
| all_eval_results: Dict[str, _RowResult], | |
| ) -> dict: | |
| results: dict = {} | |
| for example, output in zip(self.examples, batch_results): | |
| row_result = cast(_RowResult, all_eval_results.get(str(example.id), {})) | |
| results[str(example.id)] = { | |
| "input": example.inputs, | |
| "feedback": row_result.get("feedback", []), | |
| "execution_time": row_result.get("execution_time"), | |
| "run_id": row_result.get("run_id"), | |
| } | |
| if isinstance(output, EvalError): | |
| results[str(example.id)]["Error"] = output.Error | |
| else: | |
| results[str(example.id)]["output"] = output | |
| if example.outputs: | |
| results[str(example.id)]["reference"] = example.outputs | |
| return results | |
| def _run_batch_evaluators(self, runs: Dict[str, Run]) -> List[dict]: | |
| evaluators = self.batch_evaluators | |
| if not evaluators: | |
| return [] | |
| runs_list = [runs[str(example.id)] for example in self.examples] | |
| aggregate_feedback = [] | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| for evaluator in evaluators: | |
| try: | |
| result = evaluator(runs_list, self.examples) | |
| if isinstance(result, EvaluationResult): | |
| result = result.dict() | |
| aggregate_feedback.append(cast(dict, result)) | |
| executor.submit( | |
| self.client.create_feedback, | |
| **result, | |
| run_id=None, | |
| project_id=self.project.id, | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Error running batch evaluator {repr(evaluator)}: {e}" | |
| ) | |
| return aggregate_feedback | |
| def _collect_metrics(self) -> Tuple[Dict[str, _RowResult], Dict[str, Run]]: | |
| all_eval_results: dict = {} | |
| all_runs: dict = {} | |
| for c in self.configs: | |
| for callback in cast(list, c["callbacks"]): | |
| if isinstance(callback, EvaluatorCallbackHandler): | |
| eval_results = callback.logged_eval_results | |
| for (_, example_id), v in eval_results.items(): | |
| all_eval_results.setdefault(str(example_id), {}).update( | |
| {"feedback": v} | |
| ) | |
| elif isinstance(callback, LangChainTracer): | |
| run = callback.latest_run | |
| execution_time = ( | |
| (run.end_time - run.start_time).total_seconds() | |
| if run and run.end_time | |
| else None | |
| ) | |
| run_id = str(run.id) if run else None | |
| all_eval_results.setdefault(str(callback.example_id), {}).update( | |
| { | |
| "execution_time": execution_time, | |
| "run_id": run_id, | |
| "run": run, | |
| } | |
| ) | |
| all_runs[str(callback.example_id)] = run | |
| return cast(Dict[str, _RowResult], all_eval_results), all_runs | |
| def _collect_test_results( | |
| self, | |
| batch_results: List[Union[dict, str, LLMResult, ChatResult]], | |
| ) -> TestResult: | |
| logger.info("Waiting for evaluators to complete.") | |
| wait_for_all_evaluators() | |
| all_eval_results, all_runs = self._collect_metrics() | |
| aggregate_feedback = None | |
| if self.batch_evaluators: | |
| logger.info("Running session evaluators.") | |
| aggregate_feedback = self._run_batch_evaluators(all_runs) | |
| results = self._merge_test_outputs(batch_results, all_eval_results) | |
| return TestResult( | |
| project_name=self.project.name, | |
| results=results, | |
| aggregate_metrics=aggregate_feedback, | |
| ) | |
| def finish(self, batch_results: list, verbose: bool = False) -> TestResult: | |
| results = self._collect_test_results(batch_results) | |
| if verbose: | |
| try: | |
| agg_feedback = results.get_aggregate_feedback() | |
| _display_aggregate_results(agg_feedback) | |
| except Exception as e: | |
| logger.debug(f"Failed to print aggregate feedback: {repr(e)}") | |
| try: | |
| # Closing the project permits name changing and metric optimizations | |
| self.client.update_project( | |
| self.project.id, end_time=datetime.now(timezone.utc) | |
| ) | |
| except Exception as e: | |
| logger.debug(f"Failed to close project: {repr(e)}") | |
| return results | |
| def prepare( | |
| cls, | |
| client: Client, | |
| dataset_name: str, | |
| llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, | |
| project_name: Optional[str], | |
| evaluation: Optional[smith_eval.RunEvalConfig] = None, | |
| tags: Optional[List[str]] = None, | |
| input_mapper: Optional[Callable[[Dict], Any]] = None, | |
| concurrency_level: int = 5, | |
| project_metadata: Optional[Dict[str, Any]] = None, | |
| revision_id: Optional[str] = None, | |
| dataset_version: Optional[Union[datetime, str]] = None, | |
| ) -> _DatasetRunContainer: | |
| project_name = project_name or name_generation.random_name() | |
| if revision_id: | |
| if not project_metadata: | |
| project_metadata = {} | |
| project_metadata.update({"revision_id": revision_id}) | |
| wrapped_model, project, dataset, examples = _prepare_eval_run( | |
| client, | |
| dataset_name, | |
| llm_or_chain_factory, | |
| project_name, | |
| project_metadata=project_metadata, | |
| tags=tags, | |
| dataset_version=dataset_version, | |
| ) | |
| tags = tags or [] | |
| for k, v in (project.metadata.get("git") or {}).items(): | |
| tags.append(f"git:{k}={v}") | |
| run_metadata = {"dataset_version": project.metadata["dataset_version"]} | |
| if revision_id: | |
| run_metadata["revision_id"] = revision_id | |
| wrapped_model = _wrap_in_chain_factory(llm_or_chain_factory) | |
| run_evaluators = _setup_evaluation( | |
| wrapped_model, examples, evaluation, dataset.data_type or DataType.kv | |
| ) | |
| _validate_example_inputs(examples[0], wrapped_model, input_mapper) | |
| progress_bar = progress.ProgressBarCallback(len(examples)) | |
| configs = [ | |
| RunnableConfig( | |
| callbacks=[ | |
| LangChainTracer( | |
| project_name=project.name, | |
| client=client, | |
| example_id=example.id, | |
| ), | |
| EvaluatorCallbackHandler( | |
| evaluators=run_evaluators or [], | |
| client=client, | |
| example_id=example.id, | |
| max_concurrency=0, | |
| ), | |
| progress_bar, | |
| ], | |
| tags=tags, | |
| max_concurrency=concurrency_level, | |
| metadata=run_metadata, | |
| ) | |
| for example in examples | |
| ] | |
| return cls( | |
| client=client, | |
| project=project, | |
| wrapped_model=wrapped_model, | |
| examples=examples, | |
| configs=configs, | |
| batch_evaluators=evaluation.batch_evaluators if evaluation else None, | |
| ) | |
| def _is_jupyter_environment() -> bool: | |
| try: | |
| from IPython import get_ipython | |
| res = get_ipython() | |
| return get_ipython() is not None and "zmqshell" in str(type(res)) | |
| except ImportError: | |
| return False | |
| def _display_aggregate_results(aggregate_results: pd.DataFrame) -> None: | |
| if _is_jupyter_environment(): | |
| from IPython.display import HTML, display | |
| display(HTML("<h3>Experiment Results:</h3>")) | |
| display(aggregate_results) | |
| else: | |
| formatted_string = aggregate_results.to_string( | |
| float_format=lambda x: f"{x:.2f}", justify="right" | |
| ) | |
| print("\n Experiment Results:") # noqa: T201 | |
| print(formatted_string) # noqa: T201 | |
| _INPUT_MAPPER_DEP_WARNING = ( | |
| "The input_mapper argument is deprecated and " | |
| "will be removed in a future release. Please add a " | |
| " RunnableLambda to your chain to map inputs to the expected format" | |
| " instead. Example:\n" | |
| "def construct_chain():\n" | |
| " my_chain = ...\n" | |
| " input_mapper = {'other_key': 'MyOtherInput', 'my_input_key': x}\n" | |
| " return input_mapper | my_chain\n" | |
| "run_on_dataset(..., llm_or_chain_factory=construct_chain)\n" | |
| "(See https://api.python.langchain.com/en/latest/schema/" | |
| "langchain.schema.runnable.base.RunnableLambda.html)" | |
| ) | |
| ## Public API | |
| async def arun_on_dataset( | |
| client: Optional[Client], | |
| dataset_name: str, | |
| llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, | |
| *, | |
| evaluation: Optional[smith_eval.RunEvalConfig] = None, | |
| dataset_version: Optional[Union[datetime, str]] = None, | |
| concurrency_level: int = 5, | |
| project_name: Optional[str] = None, | |
| project_metadata: Optional[Dict[str, Any]] = None, | |
| verbose: bool = False, | |
| revision_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Dict[str, Any]: | |
| input_mapper = kwargs.pop("input_mapper", None) | |
| if input_mapper: | |
| warn_deprecated("0.0.305", message=_INPUT_MAPPER_DEP_WARNING, pending=True) | |
| if revision_id is None: | |
| revision_id = get_langchain_env_var_metadata().get("revision_id") | |
| tags = kwargs.pop("tags", None) | |
| if tags: | |
| warn_deprecated( | |
| "0.1.9", | |
| message="The tags argument is deprecated and will be" | |
| " removed in a future release. Please specify project_metadata instead.", | |
| pending=True, | |
| ) | |
| if kwargs: | |
| warn_deprecated( | |
| "0.0.305", | |
| message="The following arguments are deprecated and " | |
| "will be removed in a future release: " | |
| f"{kwargs.keys()}.", | |
| removal="0.0.305", | |
| ) | |
| client = client or Client() | |
| container = _DatasetRunContainer.prepare( | |
| client, | |
| dataset_name, | |
| llm_or_chain_factory, | |
| project_name, | |
| evaluation, | |
| tags, | |
| input_mapper, | |
| concurrency_level, | |
| project_metadata=project_metadata, | |
| revision_id=revision_id, | |
| dataset_version=dataset_version, | |
| ) | |
| batch_results = await runnable_utils.gather_with_concurrency( | |
| container.configs[0].get("max_concurrency"), | |
| *map( | |
| functools.partial( | |
| _arun_llm_or_chain, | |
| llm_or_chain_factory=container.wrapped_model, | |
| input_mapper=input_mapper, | |
| ), | |
| container.examples, | |
| container.configs, | |
| ), | |
| ) | |
| return container.finish(batch_results, verbose=verbose) | |
| def run_on_dataset( | |
| client: Optional[Client], | |
| dataset_name: str, | |
| llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, | |
| *, | |
| evaluation: Optional[smith_eval.RunEvalConfig] = None, | |
| dataset_version: Optional[Union[datetime, str]] = None, | |
| concurrency_level: int = 5, | |
| project_name: Optional[str] = None, | |
| project_metadata: Optional[Dict[str, Any]] = None, | |
| verbose: bool = False, | |
| revision_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Dict[str, Any]: | |
| input_mapper = kwargs.pop("input_mapper", None) | |
| if input_mapper: | |
| warn_deprecated("0.0.305", message=_INPUT_MAPPER_DEP_WARNING, pending=True) | |
| tags = kwargs.pop("tags", None) | |
| if tags: | |
| warn_deprecated( | |
| "0.1.9", | |
| message="The tags argument is deprecated and will be" | |
| " removed in a future release. Please specify project_metadata instead.", | |
| pending=True, | |
| ) | |
| if revision_id is None: | |
| revision_id = get_langchain_env_var_metadata().get("revision_id") | |
| if kwargs: | |
| warn_deprecated( | |
| "0.0.305", | |
| message="The following arguments are deprecated and " | |
| "will be removed in a future release: " | |
| f"{kwargs.keys()}.", | |
| removal="0.0.305", | |
| ) | |
| client = client or Client() | |
| container = _DatasetRunContainer.prepare( | |
| client, | |
| dataset_name, | |
| llm_or_chain_factory, | |
| project_name, | |
| evaluation, | |
| tags, | |
| input_mapper, | |
| concurrency_level, | |
| project_metadata=project_metadata, | |
| revision_id=revision_id, | |
| dataset_version=dataset_version, | |
| ) | |
| if concurrency_level == 0: | |
| batch_results = [ | |
| _run_llm_or_chain( | |
| example, | |
| config, | |
| llm_or_chain_factory=container.wrapped_model, | |
| input_mapper=input_mapper, | |
| ) | |
| for example, config in zip(container.examples, container.configs) | |
| ] | |
| else: | |
| with runnable_config.get_executor_for_config(container.configs[0]) as executor: | |
| batch_results = list( | |
| executor.map( | |
| functools.partial( | |
| _run_llm_or_chain, | |
| llm_or_chain_factory=container.wrapped_model, | |
| input_mapper=input_mapper, | |
| ), | |
| container.examples, | |
| container.configs, | |
| ) | |
| ) | |
| return container.finish(batch_results, verbose=verbose) | |
| _RUN_ON_DATASET_DOCSTRING = """ | |
| Run the Chain or language model on a dataset and store traces | |
| to the specified project name. | |
| Args: | |
| dataset_name: Name of the dataset to run the chain on. | |
| llm_or_chain_factory: Language model or Chain constructor to run | |
| over the dataset. The Chain constructor is used to permit | |
| independent calls on each example without carrying over state. | |
| evaluation: Configuration for evaluators to run on the | |
| results of the chain | |
| concurrency_level: The number of async tasks to run concurrently. | |
| project_name: Name of the project to store the traces in. | |
| Defaults to {dataset_name}-{chain class name}-{datetime}. | |
| project_metadata: Optional metadata to add to the project. | |
| Useful for storing information the test variant. | |
| (prompt version, model version, etc.) | |
| client: LangSmith client to use to access the dataset and to | |
| log feedback and run traces. | |
| verbose: Whether to print progress. | |
| tags: Tags to add to each run in the project. | |
| revision_id: Optional revision identifier to assign this test run to | |
| track the performance of different versions of your system. | |
| Returns: | |
| A dictionary containing the run's project name and the resulting model outputs. | |
| For the (usually faster) async version of this function, see :func:`arun_on_dataset`. | |
| Examples | |
| -------- | |
| .. code-block:: python | |
| from langsmith import Client | |
| from langchain_openai import ChatOpenAI | |
| from langchain.chains import LLMChain | |
| from langchain.smith import smith_eval.RunEvalConfig, run_on_dataset | |
| # Chains may have memory. Passing in a constructor function lets the | |
| # evaluation framework avoid cross-contamination between runs. | |
| def construct_chain(): | |
| llm = ChatOpenAI(temperature=0) | |
| chain = LLMChain.from_string( | |
| llm, | |
| "What's the answer to {your_input_key}" | |
| ) | |
| return chain | |
| # Load off-the-shelf evaluators via config or the EvaluatorType (string or enum) | |
| evaluation_config = smith_eval.RunEvalConfig( | |
| evaluators=[ | |
| "qa", # "Correctness" against a reference answer | |
| "embedding_distance", | |
| smith_eval.RunEvalConfig.Criteria("helpfulness"), | |
| smith_eval.RunEvalConfig.Criteria({ | |
| "fifth-grader-score": "Do you have to be smarter than a fifth grader to answer this question?" | |
| }), | |
| ] | |
| ) | |
| client = Client() | |
| run_on_dataset( | |
| client, | |
| dataset_name="<my_dataset_name>", | |
| llm_or_chain_factory=construct_chain, | |
| evaluation=evaluation_config, | |
| ) | |
| You can also create custom evaluators by subclassing the | |
| :class:`StringEvaluator <langchain.evaluation.schema.StringEvaluator>` | |
| or LangSmith's `RunEvaluator` classes. | |
| .. code-block:: python | |
| from typing import Optional | |
| from langchain.evaluation import StringEvaluator | |
| class MyStringEvaluator(StringEvaluator): | |
| @property | |
| def requires_input(self) -> bool: | |
| return False | |
| @property | |
| def requires_reference(self) -> bool: | |
| return True | |
| @property | |
| def evaluation_name(self) -> str: | |
| return "exact_match" | |
| def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> dict: | |
| return {"score": prediction == reference} | |
| evaluation_config = smith_eval.RunEvalConfig( | |
| custom_evaluators = [MyStringEvaluator()], | |
| ) | |
| run_on_dataset( | |
| client, | |
| dataset_name="<my_dataset_name>", | |
| llm_or_chain_factory=construct_chain, | |
| evaluation=evaluation_config, | |
| ) | |
| """ # noqa: E501 | |
| run_on_dataset.__doc__ = _RUN_ON_DATASET_DOCSTRING | |
| arun_on_dataset.__doc__ = _RUN_ON_DATASET_DOCSTRING.replace( | |
| "run_on_dataset(", "await arun_on_dataset(" | |
| ) | |