Skip to content

Metrics

MetricType

Bases: Enum

Enumeration of metric types in Ragas.

Attributes:

Name Type Description
SINGLE_TURN str

Represents a single-turn metric type.

MULTI_TURN str

Represents a multi-turn metric type.

Metric dataclass

Metric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: ABC

Abstract base class for metrics in Ragas.

Attributes:

Name Type Description
name str

The name of the metric.

required_columns Dict[str, Set[str]]

A dictionary mapping metric type names to sets of required column names. This is a property and raises ValueError if columns are not in VALID_COLUMNS.

init abstractmethod

init(run_config: RunConfig) -> None

Initialize the metric with the given run configuration.

Parameters:

Name Type Description Default
run_config RunConfig

Configuration for the metric run including timeouts and other settings.

required
Source code in src/ragas/metrics/base.py
@abstractmethod
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with the given run configuration.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run including timeouts and other settings.
    """
    ...

score

score(row: Dict, callbacks: Callbacks = None) -> float

Calculates the score for a single row of data.

Note

This method is deprecated and will be removed in 0.3. Please use single_turn_ascore or multi_turn_ascore instead.

Source code in src/ragas/metrics/base.py
@deprecated("0.2", removal="0.3", alternative="single_turn_ascore")
def score(self, row: t.Dict, callbacks: Callbacks = None) -> float:
    """
    Calculates the score for a single row of data.

    Note
    ----
    This method is deprecated and will be removed in 0.3. Please use `single_turn_ascore` or `multi_turn_ascore` instead.
    """
    callbacks = callbacks or []
    rm, group_cm = new_group(
        self.name,
        inputs=row,
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._ascore(row=row, callbacks=group_cm)
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    # Apply nest_asyncio logic to ensure compatibility in notebook/Jupyter environments.
    apply_nest_asyncio()
    return run(_async_wrapper)

ascore async

ascore(row: Dict, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

Asynchronously calculates the score for a single row of data.

Note

This method is deprecated and will be removed in 0.3. Please use single_turn_ascore instead.

Source code in src/ragas/metrics/base.py
@deprecated("0.2", removal="0.3", alternative="single_turn_ascore")
async def ascore(
    self,
    row: t.Dict,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Asynchronously calculates the score for a single row of data.

    Note
    ----
    This method is deprecated and will be removed in 0.3. Please use `single_turn_ascore` instead.
    """
    callbacks = callbacks or []
    rm, group_cm = new_group(
        self.name,
        inputs=row,
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._ascore(row=row, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})
    return score

MetricWithLLM dataclass

MetricWithLLM(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

Bases: Metric, PromptMixin

A metric class that uses a language model for evaluation.

Attributes:

Name Type Description
llm Optional[BaseRagasLLM]

The language model used for the metric. Both BaseRagasLLM and InstructorBaseRagasLLM are accepted at runtime via duck typing (both have compatible methods).

init

init(run_config: RunConfig) -> None

Initialize the metric with run configuration and validate LLM is present.

Parameters:

Name Type Description Default
run_config RunConfig

Configuration for the metric run.

required

Raises:

Type Description
ValueError

If no LLM is provided to the metric.

Source code in src/ragas/metrics/base.py
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with run configuration and validate LLM is present.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run.

    Raises
    ------
    ValueError
        If no LLM is provided to the metric.
    """
    if self.llm is None:
        raise ValueError(
            f"Metric '{self.name}' has no valid LLM provided (self.llm is None). Please instantiate the metric with an LLM to run."
        )
    # Only BaseRagasLLM has set_run_config method, not InstructorBaseRagasLLM
    if isinstance(self.llm, BaseRagasLLM):
        self.llm.set_run_config(run_config)

train

train(path: str, demonstration_config: Optional[DemonstrationConfig] = None, instruction_config: Optional[InstructionConfig] = None, callbacks: Optional[Callbacks] = None, run_config: Optional[RunConfig] = None, batch_size: Optional[int] = None, with_debugging_logs=False, raise_exceptions: bool = True) -> None

Train the metric using local JSON data

Parameters:

Name Type Description Default
path str

Path to local JSON training data file

required
demonstration_config DemonstrationConfig

Configuration for demonstration optimization

None
instruction_config InstructionConfig

Configuration for instruction optimization

None
callbacks Callbacks

List of callback functions

None
run_config RunConfig

Run configuration

None
batch_size int

Batch size for training

None
with_debugging_logs bool

Enable debugging logs

False
raise_exceptions bool

Whether to raise exceptions during training

True

Raises:

Type Description
ValueError

If path is not provided or not a JSON file

Source code in src/ragas/metrics/base.py
def train(
    self,
    path: str,
    demonstration_config: t.Optional[DemonstrationConfig] = None,
    instruction_config: t.Optional[InstructionConfig] = None,
    callbacks: t.Optional[Callbacks] = None,
    run_config: t.Optional[RunConfig] = None,
    batch_size: t.Optional[int] = None,
    with_debugging_logs=False,
    raise_exceptions: bool = True,
) -> None:
    """
    Train the metric using local JSON data

    Parameters
    ----------
    path : str
        Path to local JSON training data file
    demonstration_config : DemonstrationConfig, optional
        Configuration for demonstration optimization
    instruction_config : InstructionConfig, optional
        Configuration for instruction optimization
    callbacks : Callbacks, optional
        List of callback functions
    run_config : RunConfig, optional
        Run configuration
    batch_size : int, optional
        Batch size for training
    with_debugging_logs : bool, default=False
        Enable debugging logs
    raise_exceptions : bool, default=True
        Whether to raise exceptions during training

    Raises
    ------
    ValueError
        If path is not provided or not a JSON file
    """
    # Validate input parameters
    if not path:
        raise ValueError("Path to training data file must be provided")

    if not path.endswith(".json"):
        raise ValueError("Train data must be in json format")

    run_config = run_config or RunConfig()
    callbacks = callbacks or []

    # Load the dataset from JSON file
    dataset = MetricAnnotation.from_json(path, metric_name=self.name)

    # only optimize the instruction if instruction_config is provided
    if instruction_config is not None:
        self._optimize_instruction(
            instruction_config=instruction_config,
            dataset=dataset,
            callbacks=callbacks,
            run_config=run_config,
            batch_size=batch_size,
            with_debugging_logs=with_debugging_logs,
            raise_exceptions=raise_exceptions,
        )

    # if demonstration_config is provided, optimize the demonstrations
    if demonstration_config is not None:
        self._optimize_demonstration(
            demonstration_config=demonstration_config,
            dataset=dataset,
        )

SingleTurnMetric dataclass

SingleTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: Metric

A metric class for evaluating single-turn interactions.

This class provides methods to score single-turn samples, both synchronously and asynchronously.

single_turn_score

single_turn_score(sample: SingleTurnSample, callbacks: Callbacks = None) -> float

Synchronously score a single-turn sample.

May raise ImportError if nest_asyncio is not installed in a Jupyter-like environment.

Source code in src/ragas/metrics/base.py
def single_turn_score(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Synchronously score a single-turn sample.

    May raise ImportError if nest_asyncio is not installed in a Jupyter-like environment.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._single_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

single_turn_ascore async

single_turn_ascore(sample: SingleTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

Asynchronously score a single-turn sample with an optional timeout.

May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.

Source code in src/ragas/metrics/base.py
async def single_turn_ascore(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Asynchronously score a single-turn sample with an optional timeout.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._single_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

MultiTurnMetric dataclass

MultiTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: Metric

A metric class for evaluating multi-turn conversations.

This class extends the base Metric class to provide functionality for scoring multi-turn conversation samples.

multi_turn_score

multi_turn_score(sample: MultiTurnSample, callbacks: Callbacks = None) -> float

Score a multi-turn conversation sample synchronously.

May raise ImportError if nest_asyncio is not installed in Jupyter-like environments.

Source code in src/ragas/metrics/base.py
def multi_turn_score(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Score a multi-turn conversation sample synchronously.

    May raise ImportError if nest_asyncio is not installed in Jupyter-like environments.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._multi_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

multi_turn_ascore async

multi_turn_ascore(sample: MultiTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

Score a multi-turn conversation sample asynchronously.

May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.

Source code in src/ragas/metrics/base.py
async def multi_turn_ascore(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Score a multi-turn conversation sample asynchronously.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)

    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._multi_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )

    return score

Ensember

Combine multiple llm outputs for same input (n>1) to a single output

from_discrete

from_discrete(inputs: list[list[Dict]], attribute: str) -> List[Dict]

Simple majority voting for binary values, ie [0,0,1] -> 0 inputs: list of list of dicts each containing verdict for a single input

Source code in src/ragas/metrics/base.py
def from_discrete(
    self, inputs: list[list[t.Dict]], attribute: str
) -> t.List[t.Dict]:
    """
    Simple majority voting for binary values, ie [0,0,1] -> 0
    inputs: list of list of dicts each containing verdict for a single input
    """

    if not isinstance(inputs, list):
        inputs = [inputs]

    if not all(len(item) == len(inputs[0]) for item in inputs):
        logger.warning("All inputs must have the same length")
        return inputs[0]

    if not all(attribute in item for input in inputs for item in input):
        logger.warning(f"All inputs must have {attribute} attribute")
        return inputs[0]

    if len(inputs) == 1:
        return inputs[0]

    verdict_agg = []
    for i in range(len(inputs[0])):
        item = inputs[0][i]
        verdicts = [inputs[k][i][attribute] for k in range(len(inputs))]
        verdict_counts = dict(Counter(verdicts).most_common())
        item[attribute] = list(verdict_counts.keys())[0]
        verdict_agg.append(item)

    return verdict_agg

SimpleBaseMetric dataclass

SimpleBaseMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])())

Bases: ABC

Base class for simple metrics that return MetricResult objects.

This class provides the foundation for metrics that evaluate inputs and return structured MetricResult objects containing scores and reasoning.

Attributes:

Name Type Description
name str

The name of the metric.

allowed_values AllowedValuesType

Allowed values for the metric output. Can be a list of strings for discrete metrics, a tuple of floats for numeric metrics, or an integer for ranking metrics.

Examples:

>>> from ragas.metrics import discrete_metric
>>>
>>> @discrete_metric(name="sentiment", allowed_values=["positive", "negative"])
>>> def sentiment_metric(user_input: str, response: str) -> str:
...     return "positive" if "good" in response else "negative"
>>>
>>> result = sentiment_metric(user_input="How are you?", response="I'm good!")
>>> print(result.value)  # "positive"

score abstractmethod

score(**kwargs) -> 'MetricResult'

Synchronously calculate the metric score.

Parameters:

Name Type Description Default
**kwargs dict

Input parameters required by the specific metric implementation.

{}

Returns:

Type Description
MetricResult

The evaluation result containing the score and reasoning.

Source code in src/ragas/metrics/base.py
@abstractmethod
def score(self, **kwargs) -> "MetricResult":
    """
    Synchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

ascore abstractmethod async

ascore(**kwargs) -> 'MetricResult'

Asynchronously calculate the metric score.

Parameters:

Name Type Description Default
**kwargs dict

Input parameters required by the specific metric implementation.

{}

Returns:

Type Description
MetricResult

The evaluation result containing the score and reasoning.

Source code in src/ragas/metrics/base.py
@abstractmethod
async def ascore(self, **kwargs) -> "MetricResult":
    """
    Asynchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

batch_score

batch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

Synchronously calculate scores for a batch of inputs.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

List of input dictionaries, each containing parameters for the metric.

required

Returns:

Type Description
List[MetricResult]

List of evaluation results, one for each input.

Source code in src/ragas/metrics/base.py
def batch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Synchronously calculate scores for a batch of inputs.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    return [self.score(**input_dict) for input_dict in inputs]

abatch_score async

abatch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

Asynchronously calculate scores for a batch of inputs in parallel.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

List of input dictionaries, each containing parameters for the metric.

required

Returns:

Type Description
List[MetricResult]

List of evaluation results, one for each input.

Source code in src/ragas/metrics/base.py
async def abatch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Asynchronously calculate scores for a batch of inputs in parallel.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    async_tasks = []
    for input_dict in inputs:
        # Process input asynchronously
        async_tasks.append(self.ascore(**input_dict))

    # Run all tasks concurrently and return results
    return await asyncio.gather(*async_tasks)

SimpleLLMMetric dataclass

SimpleLLMMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])(), prompt: Optional[Union[str, 'Prompt']] = None)

Bases: SimpleBaseMetric

LLM-based metric that uses prompts to generate structured responses.

save

save(path: Optional[str] = None) -> None

Save the metric configuration to a JSON file.

Parameters:

path : str, optional File path to save to. If not provided, saves to "./{metric.name}.json" Use .gz extension for compression.

Note:

If the metric has a response_model, its schema will be saved for reference but the model itself cannot be serialized. You'll need to provide it when loading.

Examples:

All these work:

metric.save() # → ./response_quality.json metric.save("custom.json") # → ./custom.json metric.save("/path/to/metrics/") # → /path/to/metrics/response_quality.json metric.save("no_extension") # → ./no_extension.json metric.save("compressed.json.gz") # → ./compressed.json.gz (compressed)

Source code in src/ragas/metrics/base.py
def save(self, path: t.Optional[str] = None) -> None:
    """
    Save the metric configuration to a JSON file.

    Parameters:
    -----------
    path : str, optional
        File path to save to. If not provided, saves to "./{metric.name}.json"
        Use .gz extension for compression.

    Note:
    -----
    If the metric has a response_model, its schema will be saved for reference
    but the model itself cannot be serialized. You'll need to provide it when loading.

    Examples:
    ---------
    All these work:
    >>> metric.save()                      # → ./response_quality.json
    >>> metric.save("custom.json")         # → ./custom.json
    >>> metric.save("/path/to/metrics/")   # → /path/to/metrics/response_quality.json
    >>> metric.save("no_extension")        # → ./no_extension.json
    >>> metric.save("compressed.json.gz")  # → ./compressed.json.gz (compressed)
    """
    import gzip
    import json
    import warnings
    from pathlib import Path

    # Handle default path
    if path is None:
        # Default to current directory with metric name as filename
        file_path = Path(f"./{self.name}.json")
    else:
        file_path = Path(path)

        # If path is a directory, append the metric name as filename
        if file_path.is_dir():
            file_path = file_path / f"{self.name}.json"
        # If path has no extension, add .json
        elif not file_path.suffix:
            file_path = file_path.with_suffix(".json")

    # Collect warning messages for data loss
    warning_messages = []

    if hasattr(self, "_response_model") and self._response_model:
        # Only warn for custom response models, not auto-generated ones
        if not getattr(self._response_model, "__ragas_auto_generated__", False):
            warning_messages.append(
                "- Custom response_model will be lost (set it manually after loading)"
            )

    # Serialize the prompt (may add embedding_model warning)
    prompt_data = self._serialize_prompt(warning_messages)

    # Determine the metric type
    metric_type = self.__class__.__name__

    # Get metric-specific config
    config = self._get_metric_config()

    # Emit consolidated warning if there's data loss
    if warning_messages:
        warnings.warn(
            "Some metric components cannot be saved and will be lost:\n"
            + "\n".join(warning_messages)
            + "\n\nYou'll need to provide these when loading the metric."
        )

    data = {
        "format_version": "1.0",
        "metric_type": metric_type,
        "name": self.name,
        "prompt": prompt_data,
        "config": config,
        "response_model_info": self._serialize_response_model_info(),
    }
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "wt", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
        else:
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
    except (OSError, IOError) as e:
        raise ValueError(f"Cannot save metric to {file_path}: {e}")

load classmethod

load(path: str, response_model: Optional[Type['BaseModel']] = None, embedding_model: Optional['EmbeddingModelType'] = None) -> 'SimpleLLMMetric'

Load a metric from a JSON file.

Parameters:

path : str File path to load from. Supports .gz compressed files. response_model : Optional[Type[BaseModel]] Pydantic model to use for response validation. Required for custom SimpleLLMMetrics. embedding_model : Optional[Any] Embedding model for DynamicFewShotPrompt. Required if the original used one.

Returns:

SimpleLLMMetric Loaded metric instance

Raises:

ValueError If file cannot be loaded, is invalid, or missing required models

Source code in src/ragas/metrics/base.py
@classmethod
def load(
    cls,
    path: str,
    response_model: t.Optional[t.Type["BaseModel"]] = None,
    embedding_model: t.Optional["EmbeddingModelType"] = None,
) -> "SimpleLLMMetric":
    """
    Load a metric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    response_model : Optional[Type[BaseModel]]
        Pydantic model to use for response validation. Required for custom SimpleLLMMetrics.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    SimpleLLMMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded, is invalid, or missing required models
    """
    import gzip
    import json
    from pathlib import Path

    file_path = Path(path)

    # Load JSON data
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "rt", encoding="utf-8") as f:
                data = json.load(f)
        else:
            with open(file_path, "r", encoding="utf-8") as f:
                data = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
        raise ValueError(f"Cannot load metric from {path}: {e}")

    # Validate format
    if data.get("format_version") != "1.0":
        import warnings

        warnings.warn(
            f"Loading metric with format version {data.get('format_version')}, expected 1.0"
        )

    # Reconstruct the prompt
    prompt = cls._deserialize_prompt(data["prompt"], embedding_model)

    # Get config
    config = data.get("config", {})

    # Create the metric instance
    metric = cls(name=data["name"], prompt=prompt, **config)

    # Set response model if provided
    if response_model:
        metric._response_model = response_model

    return metric

get_correlation abstractmethod

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

Calculate the correlation between gold scores and predicted scores. This is a placeholder method and should be implemented based on the specific metric.

Source code in src/ragas/metrics/base.py
@abstractmethod
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold scores and predicted scores.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    pass

align_and_validate

align_and_validate(dataset: 'Dataset', embedding_model: 'EmbeddingModelType', llm: 'BaseRagasLLM', test_size: float = 0.2, random_state: int = 42, **kwargs: Dict[str, Any])

Args: dataset: experiment to align the metric with. embedding_model: The embedding model used for dynamic few-shot prompting. llm: The LLM instance to use for scoring.

Align the metric with the specified experiments and validate it against a gold standard experiment. This method combines alignment and validation into a single step.

Source code in src/ragas/metrics/base.py
def align_and_validate(
    self,
    dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    llm: "BaseRagasLLM",
    test_size: float = 0.2,
    random_state: int = 42,
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        dataset: experiment to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.
        llm: The LLM instance to use for scoring.

    Align the metric with the specified experiments and validate it against a gold standard experiment.
    This method combines alignment and validation into a single step.
    """
    train_dataset, test_dataset = dataset.train_test_split(
        test_size=test_size, random_state=random_state
    )

    self.align(train_dataset, embedding_model, **kwargs)  # type: ignore
    return self.validate_alignment(llm, test_dataset)  # type: ignore

align

align(train_dataset: 'Dataset', embedding_model: 'EmbeddingModelType', **kwargs: Dict[str, Any])

Args: train_dataset: train_dataset to align the metric with. embedding_model: The embedding model used for dynamic few-shot prompting.

Align the metric with the specified experiments by different optimization methods.

Source code in src/ragas/metrics/base.py
def align(
    self,
    train_dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        train_dataset: train_dataset to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.

    Align the metric with the specified experiments by different optimization methods.
    """

    # get prompt
    if not self.prompt:
        raise Exception("prompt not passed")
    from ragas.prompt.simple_prompt import Prompt

    self.prompt = (
        self.prompt if isinstance(self.prompt, Prompt) else Prompt(self.prompt)
    )
    # Extract specific parameters for from_prompt method
    max_similar_examples_val = kwargs.get("max_similar_examples", 3)
    similarity_threshold_val = kwargs.get("similarity_threshold", 0.7)
    max_similar_examples = (
        int(max_similar_examples_val)
        if isinstance(max_similar_examples_val, (int, str))
        else 3
    )
    similarity_threshold = (
        float(similarity_threshold_val)
        if isinstance(similarity_threshold_val, (int, float, str))
        else 0.7
    )
    # Convert BaseRagasEmbeddings to BaseRagasEmbedding if needed
    if hasattr(embedding_model, "embed_query"):
        # For legacy BaseRagasEmbeddings, we need to wrap it
        # Create a wrapper that implements BaseRagasEmbedding interface
        class EmbeddingWrapper:
            def __init__(self, legacy_embedding):
                self.legacy_embedding = legacy_embedding

            def embed_text(self, text: str, **kwargs) -> t.List[float]:
                return self.legacy_embedding.embed_query(text)

            async def aembed_text(self, text: str, **kwargs) -> t.List[float]:
                return await self.legacy_embedding.aembed_query(text)

        actual_embedding_model = EmbeddingWrapper(embedding_model)
    else:
        # Already BaseRagasEmbedding
        actual_embedding_model = embedding_model

    from ragas.prompt.dynamic_few_shot import DynamicFewShotPrompt

    self.prompt = DynamicFewShotPrompt.from_prompt(
        self.prompt,
        actual_embedding_model,  # type: ignore[arg-type]
        max_similar_examples,
        similarity_threshold,
    )
    train_dataset.reload()
    total_items = len(train_dataset)
    input_vars = self.get_variables()
    output_vars = [self.name, f"{self.name}_reason"]

    from rich.progress import Progress

    with Progress() as progress:
        task = progress.add_task("Processing examples", total=total_items)
        for row in train_dataset:
            inputs = {
                var: train_dataset.get_row_value(row, var) for var in input_vars
            }
            inputs = {k: v for k, v in inputs.items() if v is not None}
            output = {
                var: train_dataset.get_row_value(row, var) for var in output_vars
            }
            output = {k: v for k, v in output.items() if v is not None}

            if output:
                self.prompt.add_example(inputs, output)
            progress.update(task, advance=1)

validate_alignment

validate_alignment(llm: 'BaseRagasLLM', test_dataset: 'Dataset', mapping: Dict[str, str] = {})

Args: llm: The LLM instance to use for scoring. test_dataset: An Dataset instance containing the gold standard scores. mapping: A dictionary mapping variable names expected by metrics to their corresponding names in the gold experiment.

Validate the alignment of the metric by comparing the scores against a gold standard experiment. This method computes the Cohen's Kappa score and agreement rate between the gold standard scores and the predicted scores from the metric.

Source code in src/ragas/metrics/base.py
def validate_alignment(
    self,
    llm: "BaseRagasLLM",
    test_dataset: "Dataset",
    mapping: t.Dict[str, str] = {},
):
    """
    Args:
        llm: The LLM instance to use for scoring.
        test_dataset: An Dataset instance containing the gold standard scores.
        mapping: A dictionary mapping variable names expected by metrics to their corresponding names in the gold experiment.

    Validate the alignment of the metric by comparing the scores against a gold standard experiment.
    This method computes the Cohen's Kappa score and agreement rate between the gold standard scores and
    the predicted scores from the metric.
    """

    test_dataset.reload()
    gold_scores_raw = [
        test_dataset.get_row_value(row, self.name) for row in test_dataset
    ]
    pred_scores = []
    for row in test_dataset:
        values = {
            v: (
                test_dataset.get_row_value(row, v)
                if v not in mapping
                else test_dataset.get_row_value(row, mapping.get(v, v))
            )
            for v in self.get_variables()
        }
        score = self.score(llm=llm, **values)
        pred_scores.append(score.value)

    # Convert to strings for correlation calculation, filtering out None values
    gold_scores = [str(score) for score in gold_scores_raw if score is not None]
    pred_scores_str = [str(score) for score in pred_scores if score is not None]

    df = test_dataset.to_pandas()
    df[f"{self.name}_pred"] = pred_scores
    correlation = self.get_correlation(gold_scores, pred_scores_str)
    agreement_rate = sum(
        x == y for x, y in zip(gold_scores, pred_scores_str)
    ) / len(gold_scores)
    return {
        "correlation": correlation,
        "agreement_rate": agreement_rate,
        "df": df,
    }

create_auto_response_model

create_auto_response_model(name: str, **fields) -> Type['BaseModel']

Create a response model and mark it as auto-generated by Ragas.

This function creates a Pydantic model using create_model and marks it with a special attribute to indicate it was auto-generated. This allows the save() method to distinguish between auto-generated models (which are recreated on load) and custom user models.

Parameters:

Name Type Description Default
name str

Name for the model class

required
**fields

Field definitions in create_model format. Each field is specified as: field_name=(type, default_or_field_info)

{}

Returns:

Type Description
Type[BaseModel]

Pydantic model class marked as auto-generated

Examples:

>>> from pydantic import Field
>>> # Simple model with required fields
>>> ResponseModel = create_auto_response_model(
...     "ResponseModel",
...     value=(str, ...),
...     reason=(str, ...)
... )
>>>
>>> # Model with Field validators and descriptions
>>> ResponseModel = create_auto_response_model(
...     "ResponseModel",
...     value=(str, Field(..., description="The predicted value")),
...     reason=(str, Field(..., description="Reasoning for the prediction"))
... )
Source code in src/ragas/metrics/base.py
def create_auto_response_model(name: str, **fields) -> t.Type["BaseModel"]:
    """
    Create a response model and mark it as auto-generated by Ragas.

    This function creates a Pydantic model using create_model and marks it
    with a special attribute to indicate it was auto-generated. This allows
    the save() method to distinguish between auto-generated models (which
    are recreated on load) and custom user models.

    Parameters
    ----------
    name : str
        Name for the model class
    **fields
        Field definitions in create_model format.
        Each field is specified as: field_name=(type, default_or_field_info)

    Returns
    -------
    Type[BaseModel]
        Pydantic model class marked as auto-generated

    Examples
    --------
    >>> from pydantic import Field
    >>> # Simple model with required fields
    >>> ResponseModel = create_auto_response_model(
    ...     "ResponseModel",
    ...     value=(str, ...),
    ...     reason=(str, ...)
    ... )
    >>>
    >>> # Model with Field validators and descriptions
    >>> ResponseModel = create_auto_response_model(
    ...     "ResponseModel",
    ...     value=(str, Field(..., description="The predicted value")),
    ...     reason=(str, Field(..., description="Reasoning for the prediction"))
    ... )
    """
    from pydantic import create_model

    model = create_model(name, **fields)
    setattr(model, "__ragas_auto_generated__", True)  # type: ignore[attr-defined]
    return model

AnswerCorrectness dataclass

AnswerCorrectness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'reference'}})(), name: str = 'answer_correctness', embeddings: Optional[Union[BaseRagasEmbeddings, BaseRagasEmbedding]] = None, llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, correctness_prompt: PydanticPrompt = CorrectnessClassifier(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), weights: list[float] = (lambda: [0.75, 0.25])(), beta: float = 1.0, answer_similarity: Optional[AnswerSimilarity] = None, max_retries: int = 1)

Bases: MetricWithLLM, MetricWithEmbeddings, SingleTurnMetric

Measures answer correctness compared to ground truth as a combination of factuality and semantic similarity.

Attributes:

Name Type Description
name string

The name of the metrics

weights list[float]

a list of two weights corresponding to factuality and semantic similarity Defaults [0.75, 0.25]

answer_similarity Optional[AnswerSimilarity]

The AnswerSimilarity object

ResponseRelevancy dataclass

ResponseRelevancy(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response'}})(), name: str = 'answer_relevancy', embeddings: Optional[Union[BaseRagasEmbeddings, BaseRagasEmbedding]] = None, llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, question_generation: PydanticPrompt = ResponseRelevancePrompt(), strictness: int = 3)

Bases: MetricWithLLM, MetricWithEmbeddings, SingleTurnMetric

Scores the relevancy of the answer according to the given question. Answers with incomplete, redundant or unnecessary information is penalized. Score can range from 0 to 1 with 1 being the best.

Attributes:

Name Type Description
name string

The name of the metrics

strictness int

Here indicates the number questions generated per answer. Ideal range between 3 to 5.

embeddings Embedding

The langchain wrapper of Embedding object. E.g. HuggingFaceEmbeddings('BAAI/bge-base-en')

SemanticSimilarity dataclass

SemanticSimilarity(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'reference', 'response'}})(), name: str = 'semantic_similarity', embeddings: Optional[Union[BaseRagasEmbeddings, BaseRagasEmbedding]] = None, is_cross_encoder: bool = False, threshold: Optional[float] = None)

Bases: MetricWithEmbeddings, SingleTurnMetric

Scores the semantic similarity of ground truth with generated answer. cross encoder score is used to quantify semantic similarity. SAS paper: https://arxiv.org/pdf/2108.06130.pdf

Attributes:

Name Type Description
name str
model_name

The model to be used for calculating semantic similarity Defaults open-ai-embeddings select cross-encoder model for best results https://huggingface.co/spaces/mteb/leaderboard

threshold Optional[float]

The threshold if given used to map output to binary Default 0.5

AspectCritic

AspectCritic(name: str, definition: str, llm: Optional[BaseRagasLLM] = None, required_columns: Optional[Dict[MetricType, Set[str]]] = None, output_type: Optional[MetricOutputType] = BINARY, single_turn_prompt: Optional[PydanticPrompt] = None, multi_turn_prompt: Optional[PydanticPrompt] = None, strictness: int = 1, max_retries: int = 1)

Bases: MetricWithLLM, SingleTurnMetric, MultiTurnMetric

Judges the submission to give binary results using the criteria specified in the metric definition.

Attributes:

Name Type Description
name str

name of the metrics

definition str

criteria to judge the submission, example "Is the submission spreading fake information?"

strictness int

The number of times self consistency checks is made. Final judgement is made using majority vote.

Source code in src/ragas/metrics/_aspect_critic.py
def __init__(
    self,
    name: str,
    definition: str,
    llm: t.Optional[BaseRagasLLM] = None,
    required_columns: t.Optional[t.Dict[MetricType, t.Set[str]]] = None,
    output_type: t.Optional[MetricOutputType] = MetricOutputType.BINARY,
    single_turn_prompt: t.Optional[PydanticPrompt] = None,
    multi_turn_prompt: t.Optional[PydanticPrompt] = None,
    strictness: int = 1,
    max_retries: int = 1,
):
    self._required_columns = required_columns or {
        MetricType.SINGLE_TURN: {
            "user_input:optional",
            "response:optional",
            "retrieved_contexts:optional",
            "reference:optional",
            "reference_contexts:optional",
        },
        MetricType.MULTI_TURN: {
            "user_input:optional",
            "reference:optional",
        },
    }
    super().__init__(
        name=name,
        _required_columns=self._required_columns,
        llm=llm,
        output_type=output_type,
    )

    self._definition = definition
    self.single_turn_prompt = single_turn_prompt or SingleTurnAspectCriticPrompt()
    self.multi_turn_prompt = multi_turn_prompt or MultiTurnAspectCriticPrompt()
    self.max_retries = max_retries

    # update the instruction for the prompts with the definition
    instruction = f"Evaluate the Input based on the criterial defined. Use only 'Yes' (1) and 'No' (0) as verdict.\nCriteria Definition: {self._definition}"
    self.single_turn_prompt.instruction = instruction
    self.multi_turn_prompt.instruction = instruction

    # ensure odd number of checks to avoid tie in majority vote.
    self.strictness = strictness
    self.strictness = (
        self.strictness if self.strictness % 2 != 0 else self.strictness + 1
    )

ContextEntityRecall dataclass

ContextEntityRecall(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'reference', 'retrieved_contexts'}})(), name: str = 'context_entity_recall', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, context_entity_recall_prompt: PydanticPrompt = ExtractEntitiesPrompt(), max_retries: int = 1)

Bases: MetricWithLLM, SingleTurnMetric

Calculates recall based on entities present in ground truth and context. Let CN be the set of entities present in context, GN be the set of entities present in the ground truth.

Then we define can the context entity recall as follows: Context Entity recall = | CN ∩ GN | / | GN |

If this quantity is 1, we can say that the retrieval mechanism has retrieved context which covers all entities present in the ground truth, thus being a useful retrieval. Thus this can be used to evaluate retrieval mechanisms in specific use cases where entities matter, for example, a tourism help chatbot.

Attributes:

Name Type Description
name str
batch_size int

Batch size for openai completion.

IDBasedContextPrecision dataclass

IDBasedContextPrecision(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'retrieved_context_ids', 'reference_context_ids'}})(), name: str = 'id_based_context_precision', output_type: MetricOutputType = CONTINUOUS)

Bases: SingleTurnMetric

Calculates context precision by directly comparing retrieved context IDs with reference context IDs. The score represents what proportion of the retrieved context IDs are actually relevant (present in reference).

This metric works with both string and integer IDs.

Attributes:

Name Type Description
name str

Name of the metric

LLMContextPrecisionWithReference dataclass

LLMContextPrecisionWithReference(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'retrieved_contexts', 'reference'}})(), name: str = 'llm_context_precision_with_reference', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, context_precision_prompt: PydanticPrompt = ContextPrecisionPrompt(), max_retries: int = 1)

Bases: MetricWithLLM, SingleTurnMetric

Average Precision is a metric that evaluates whether all of the relevant items selected by the model are ranked higher or not.

Attributes:

Name Type Description
name str
evaluation_mode EvaluationMode
context_precision_prompt Prompt

IDBasedContextRecall dataclass

IDBasedContextRecall(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'retrieved_context_ids', 'reference_context_ids'}})(), name: str = 'id_based_context_recall', output_type: MetricOutputType = CONTINUOUS)

Bases: SingleTurnMetric

Calculates context recall by directly comparing retrieved context IDs with reference context IDs. The score represents what proportion of the reference IDs were successfully retrieved.

This metric works with both string and integer IDs.

Attributes:

Name Type Description
name str

Name of the metric

LLMContextRecall dataclass

LLMContextRecall(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'retrieved_contexts', 'reference'}})(), name: str = 'context_recall', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, context_recall_prompt: PydanticPrompt = ContextRecallClassificationPrompt(), max_retries: int = 1)

Bases: MetricWithLLM, SingleTurnMetric

Estimates context recall by estimating TP and FN using annotated answer and retrieved context.

Attributes:

Name Type Description
name str

FactualCorrectness dataclass

FactualCorrectness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'response', 'reference'}})(), name: str = 'factual_correctness', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, mode: Literal['precision', 'recall', 'f1'] = 'f1', beta: float = 1.0, atomicity: Literal['low', 'high'] = 'low', coverage: Literal['low', 'high'] = 'low', claim_decomposition_prompt: PydanticPrompt = ClaimDecompositionPrompt(), nli_prompt: PydanticPrompt = NLIStatementPrompt(), language: str = 'english')

Bases: MetricWithLLM, SingleTurnMetric

FactualCorrectness is a metric class that evaluates the factual correctness of responses generated by a language model. It uses claim decomposition and natural language inference (NLI) to verify the claims made in the responses against reference texts.

Attributes: name (str): The name of the metric, default is "factual_correctness". _required_columns (Dict[MetricType, Set[str]]): A dictionary specifying the required columns for each metric type. Default is {"SINGLE_TURN": {"response", "reference"}}. mode (Literal["precision", "recall", "f1"]): The mode of evaluation, can be "precision", "recall", or "f1". Default is "f1". beta (float): The beta value used for the F1 score calculation. A beta > 1 gives more weight to recall, while beta < 1 favors precision. Default is 1.0. atomicity (Literal["low", "high"]): The level of atomicity for claim decomposition. Default is "low". coverage (Literal["low", "high"]): The level of coverage for claim decomposition. Default is "low". claim_decomposition_prompt (PydanticPrompt): The prompt used for claim decomposition. nli_prompt (PydanticPrompt): The prompt used for natural language inference (NLI).

Faithfulness dataclass

Faithfulness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'retrieved_contexts'}})(), name: str = 'faithfulness', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, nli_statements_prompt: PydanticPrompt = NLIStatementPrompt(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), max_retries: int = 1)

FaithfulnesswithHHEM dataclass

FaithfulnesswithHHEM(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'retrieved_contexts'}})(), name: str = 'faithfulness_with_hhem', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, nli_statements_prompt: PydanticPrompt = NLIStatementPrompt(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), max_retries: int = 1, device: str = 'cpu', batch_size: int = 10)

Bases: Faithfulness

NoiseSensitivity dataclass

NoiseSensitivity(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'reference', 'retrieved_contexts'}})(), name: str = 'noise_sensitivity', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, mode: Literal['relevant', 'irrelevant'] = 'relevant', nli_statements_prompt: PydanticPrompt = NLIStatementPrompt(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), max_retries: int = 1)

AnswerAccuracy dataclass

AnswerAccuracy(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'reference'}})(), name: str = 'nv_accuracy', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

Bases: MetricWithLLM, SingleTurnMetric

Measures answer accuracy compared to ground truth given a user_input. This metric averages two distinct judge prompts to evaluate.

Top10, Zero-shoot LLM-as-a-Judge Leaderboard: 1)- nvidia/Llama-3_3-Nemotron-Super-49B-v1 2)- mistralai/mixtral-8x22b-instruct-v0.1 3)- mistralai/mixtral-8x7b-instruct-v0.1 4)- meta/llama-3.1-70b-instruct 5)- meta/llama-3.3-70b-instruct 6)- meta/llama-3.1-405b-instruct 7)- mistralai/mistral-nemo-12b-instruct 8)- nvidia/llama-3.1-nemotron-70b-instruct 9)- meta/llama-3.1-8b-instruct 10)- google/gemma-2-2b-it The top1 LB model have high correlation with human judges (~0.92).

Attributes:

Name Type Description
name string

The name of the metrics

answer_accuracy

The AnswerAccuracy object

ContextRelevance dataclass

ContextRelevance(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'retrieved_contexts'}})(), name: str = 'nv_context_relevance', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

Bases: MetricWithLLM, SingleTurnMetric

Parameters: Score the relevance of the retrieved contexts be based on the user input.

Input: data: list of Dicts with keys: user_input, retrieved_contexts Output: 0.0: retrieved_contexts is not relevant for the user_input 0.5: retrieved_contexts is partially relevant for the user_input 1.0: retrieved_contexts is fully relevant for the user_input

ResponseGroundedness dataclass

ResponseGroundedness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'response', 'retrieved_contexts'}})(), name: str = 'nv_response_groundedness', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

Bases: MetricWithLLM, SingleTurnMetric

Parameters: Score the groundedness of the response based on the retrieved contexts.

Input: data: list of Dicts with keys: response, retrieved contexts Output: 0.0: response is not grounded in the retrieved contexts 0.5: response is partially grounded in the retrieved contexts 1.0: response is fully grounded in the retrieved contexts

SimpleCriteriaScore

SimpleCriteriaScore(name: str, definition: str, llm: Optional[BaseRagasLLM] = None, required_columns: Optional[Dict[MetricType, Set[str]]] = None, output_type: Optional[MetricOutputType] = DISCRETE, single_turn_prompt: Optional[PydanticPrompt] = None, multi_turn_prompt: Optional[PydanticPrompt] = None, strictness: int = 1)

Bases: MetricWithLLM, SingleTurnMetric, MultiTurnMetric

Judges the submission to give binary results using the criteria specified in the metric definition.

Attributes:

Name Type Description
name str

name of the metrics

definition str

criteria to score the submission

strictness int

The number of times self consistency checks is made. Final judgement is made using majority vote.

Source code in src/ragas/metrics/_simple_criteria.py
def __init__(
    self,
    name: str,
    definition: str,
    llm: t.Optional[BaseRagasLLM] = None,
    required_columns: t.Optional[t.Dict[MetricType, t.Set[str]]] = None,
    output_type: t.Optional[MetricOutputType] = MetricOutputType.DISCRETE,
    single_turn_prompt: t.Optional[PydanticPrompt] = None,
    multi_turn_prompt: t.Optional[PydanticPrompt] = None,
    strictness: int = 1,
):
    if required_columns is None:
        required_columns = {
            MetricType.SINGLE_TURN: {
                "user_input:optional",
                "response:optional",
                "retrieved_contexts:optional",
                "reference:optional",
                "reference_contexts:optional",
            },
            MetricType.MULTI_TURN: {
                "user_input:optional",
                "reference:optional",
            },
        }
    super().__init__(
        name=name,
        llm=llm,
        _required_columns=required_columns,
        output_type=output_type,
    )

    self._definition = definition
    self.single_turn_prompt = single_turn_prompt or SingleTurnSimpleCriteriaPrompt()
    self.multi_turn_prompt = multi_turn_prompt or MultiTurnSimpleCriteriaPrompt()

    # update the instruction for the prompts with the definition
    instruction = f"Evaluate the input based on the criteria defined.\nCriteria Definition: {self._definition}"
    self.single_turn_prompt.instruction = instruction
    self.multi_turn_prompt.instruction = instruction

    # ensure odd number of checks to avoid tie in majority vote.
    self.strictness = strictness
    self.strictness = (
        self.strictness if self.strictness % 2 != 0 else self.strictness + 1
    )

ToolCallAccuracy dataclass

ToolCallAccuracy(_required_columns: Dict[MetricType, Set[str]] = (lambda: {MULTI_TURN: {'user_input', 'reference_tool_calls'}})(), name: str = 'tool_call_accuracy', strict_order: bool = True, arg_comparison_metric: SingleTurnMetric = (lambda: ExactMatch())())

Bases: MultiTurnMetric

Tool Call Accuracy metric measures how accurately an LLM agent makes tool calls compared to reference tool calls.

The metric supports two evaluation modes: 1. Strict order (default): Tool calls must match exactly in sequence 2. Flexible order: Tool calls can be in any order (parallel evaluation)

The metric evaluates two aspects: 1. Sequence alignment: Whether predicted and reference tool calls match in the required order 2. Argument accuracy: How well tool call arguments match between predicted and reference

Score calculation: - If sequences don't align: score = 0 - If sequences align: score = (average argument accuracy) * sequence_alignment_factor - Length mismatches result in warnings and proportional penalty

Edge cases: - No predicted tool calls: returns 0.0 - Length mismatch: compares only the overlapping portion and applies coverage penalty - Missing arguments: contributes 0 to the argument score for that tool call

The final score is always between 0.0 and 1.0.

Args: strict_order: If True (default), tool calls must match exactly in sequence. If False, tool calls can be in any order (parallel evaluation).

Metric dataclass

Metric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: ABC

Abstract base class for metrics in Ragas.

Attributes:

Name Type Description
name str

The name of the metric.

required_columns Dict[str, Set[str]]

A dictionary mapping metric type names to sets of required column names. This is a property and raises ValueError if columns are not in VALID_COLUMNS.

init abstractmethod

init(run_config: RunConfig) -> None

Initialize the metric with the given run configuration.

Parameters:

Name Type Description Default
run_config RunConfig

Configuration for the metric run including timeouts and other settings.

required
Source code in src/ragas/metrics/base.py
@abstractmethod
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with the given run configuration.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run including timeouts and other settings.
    """
    ...

score

score(row: Dict, callbacks: Callbacks = None) -> float

Calculates the score for a single row of data.

Note

This method is deprecated and will be removed in 0.3. Please use single_turn_ascore or multi_turn_ascore instead.

Source code in src/ragas/metrics/base.py
@deprecated("0.2", removal="0.3", alternative="single_turn_ascore")
def score(self, row: t.Dict, callbacks: Callbacks = None) -> float:
    """
    Calculates the score for a single row of data.

    Note
    ----
    This method is deprecated and will be removed in 0.3. Please use `single_turn_ascore` or `multi_turn_ascore` instead.
    """
    callbacks = callbacks or []
    rm, group_cm = new_group(
        self.name,
        inputs=row,
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._ascore(row=row, callbacks=group_cm)
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    # Apply nest_asyncio logic to ensure compatibility in notebook/Jupyter environments.
    apply_nest_asyncio()
    return run(_async_wrapper)

ascore async

ascore(row: Dict, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

Asynchronously calculates the score for a single row of data.

Note

This method is deprecated and will be removed in 0.3. Please use single_turn_ascore instead.

Source code in src/ragas/metrics/base.py
@deprecated("0.2", removal="0.3", alternative="single_turn_ascore")
async def ascore(
    self,
    row: t.Dict,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Asynchronously calculates the score for a single row of data.

    Note
    ----
    This method is deprecated and will be removed in 0.3. Please use `single_turn_ascore` instead.
    """
    callbacks = callbacks or []
    rm, group_cm = new_group(
        self.name,
        inputs=row,
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._ascore(row=row, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})
    return score

MetricType

Bases: Enum

Enumeration of metric types in Ragas.

Attributes:

Name Type Description
SINGLE_TURN str

Represents a single-turn metric type.

MULTI_TURN str

Represents a multi-turn metric type.

MetricWithLLM dataclass

MetricWithLLM(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

Bases: Metric, PromptMixin

A metric class that uses a language model for evaluation.

Attributes:

Name Type Description
llm Optional[BaseRagasLLM]

The language model used for the metric. Both BaseRagasLLM and InstructorBaseRagasLLM are accepted at runtime via duck typing (both have compatible methods).

init

init(run_config: RunConfig) -> None

Initialize the metric with run configuration and validate LLM is present.

Parameters:

Name Type Description Default
run_config RunConfig

Configuration for the metric run.

required

Raises:

Type Description
ValueError

If no LLM is provided to the metric.

Source code in src/ragas/metrics/base.py
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with run configuration and validate LLM is present.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run.

    Raises
    ------
    ValueError
        If no LLM is provided to the metric.
    """
    if self.llm is None:
        raise ValueError(
            f"Metric '{self.name}' has no valid LLM provided (self.llm is None). Please instantiate the metric with an LLM to run."
        )
    # Only BaseRagasLLM has set_run_config method, not InstructorBaseRagasLLM
    if isinstance(self.llm, BaseRagasLLM):
        self.llm.set_run_config(run_config)

train

train(path: str, demonstration_config: Optional[DemonstrationConfig] = None, instruction_config: Optional[InstructionConfig] = None, callbacks: Optional[Callbacks] = None, run_config: Optional[RunConfig] = None, batch_size: Optional[int] = None, with_debugging_logs=False, raise_exceptions: bool = True) -> None

Train the metric using local JSON data

Parameters:

Name Type Description Default
path str

Path to local JSON training data file

required
demonstration_config DemonstrationConfig

Configuration for demonstration optimization

None
instruction_config InstructionConfig

Configuration for instruction optimization

None
callbacks Callbacks

List of callback functions

None
run_config RunConfig

Run configuration

None
batch_size int

Batch size for training

None
with_debugging_logs bool

Enable debugging logs

False
raise_exceptions bool

Whether to raise exceptions during training

True

Raises:

Type Description
ValueError

If path is not provided or not a JSON file

Source code in src/ragas/metrics/base.py
def train(
    self,
    path: str,
    demonstration_config: t.Optional[DemonstrationConfig] = None,
    instruction_config: t.Optional[InstructionConfig] = None,
    callbacks: t.Optional[Callbacks] = None,
    run_config: t.Optional[RunConfig] = None,
    batch_size: t.Optional[int] = None,
    with_debugging_logs=False,
    raise_exceptions: bool = True,
) -> None:
    """
    Train the metric using local JSON data

    Parameters
    ----------
    path : str
        Path to local JSON training data file
    demonstration_config : DemonstrationConfig, optional
        Configuration for demonstration optimization
    instruction_config : InstructionConfig, optional
        Configuration for instruction optimization
    callbacks : Callbacks, optional
        List of callback functions
    run_config : RunConfig, optional
        Run configuration
    batch_size : int, optional
        Batch size for training
    with_debugging_logs : bool, default=False
        Enable debugging logs
    raise_exceptions : bool, default=True
        Whether to raise exceptions during training

    Raises
    ------
    ValueError
        If path is not provided or not a JSON file
    """
    # Validate input parameters
    if not path:
        raise ValueError("Path to training data file must be provided")

    if not path.endswith(".json"):
        raise ValueError("Train data must be in json format")

    run_config = run_config or RunConfig()
    callbacks = callbacks or []

    # Load the dataset from JSON file
    dataset = MetricAnnotation.from_json(path, metric_name=self.name)

    # only optimize the instruction if instruction_config is provided
    if instruction_config is not None:
        self._optimize_instruction(
            instruction_config=instruction_config,
            dataset=dataset,
            callbacks=callbacks,
            run_config=run_config,
            batch_size=batch_size,
            with_debugging_logs=with_debugging_logs,
            raise_exceptions=raise_exceptions,
        )

    # if demonstration_config is provided, optimize the demonstrations
    if demonstration_config is not None:
        self._optimize_demonstration(
            demonstration_config=demonstration_config,
            dataset=dataset,
        )

MultiTurnMetric dataclass

MultiTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: Metric

A metric class for evaluating multi-turn conversations.

This class extends the base Metric class to provide functionality for scoring multi-turn conversation samples.

multi_turn_score

multi_turn_score(sample: MultiTurnSample, callbacks: Callbacks = None) -> float

Score a multi-turn conversation sample synchronously.

May raise ImportError if nest_asyncio is not installed in Jupyter-like environments.

Source code in src/ragas/metrics/base.py
def multi_turn_score(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Score a multi-turn conversation sample synchronously.

    May raise ImportError if nest_asyncio is not installed in Jupyter-like environments.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._multi_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

multi_turn_ascore async

multi_turn_ascore(sample: MultiTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

Score a multi-turn conversation sample asynchronously.

May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.

Source code in src/ragas/metrics/base.py
async def multi_turn_ascore(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Score a multi-turn conversation sample asynchronously.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)

    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._multi_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )

    return score

BaseMetric dataclass

BaseMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])())

Bases: ABC

Base class for simple metrics that return MetricResult objects.

This class provides the foundation for metrics that evaluate inputs and return structured MetricResult objects containing scores and reasoning.

Attributes:

Name Type Description
name str

The name of the metric.

allowed_values AllowedValuesType

Allowed values for the metric output. Can be a list of strings for discrete metrics, a tuple of floats for numeric metrics, or an integer for ranking metrics.

Examples:

>>> from ragas.metrics import discrete_metric
>>>
>>> @discrete_metric(name="sentiment", allowed_values=["positive", "negative"])
>>> def sentiment_metric(user_input: str, response: str) -> str:
...     return "positive" if "good" in response else "negative"
>>>
>>> result = sentiment_metric(user_input="How are you?", response="I'm good!")
>>> print(result.value)  # "positive"

score abstractmethod

score(**kwargs) -> 'MetricResult'

Synchronously calculate the metric score.

Parameters:

Name Type Description Default
**kwargs dict

Input parameters required by the specific metric implementation.

{}

Returns:

Type Description
MetricResult

The evaluation result containing the score and reasoning.

Source code in src/ragas/metrics/base.py
@abstractmethod
def score(self, **kwargs) -> "MetricResult":
    """
    Synchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

ascore abstractmethod async

ascore(**kwargs) -> 'MetricResult'

Asynchronously calculate the metric score.

Parameters:

Name Type Description Default
**kwargs dict

Input parameters required by the specific metric implementation.

{}

Returns:

Type Description
MetricResult

The evaluation result containing the score and reasoning.

Source code in src/ragas/metrics/base.py
@abstractmethod
async def ascore(self, **kwargs) -> "MetricResult":
    """
    Asynchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

batch_score

batch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

Synchronously calculate scores for a batch of inputs.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

List of input dictionaries, each containing parameters for the metric.

required

Returns:

Type Description
List[MetricResult]

List of evaluation results, one for each input.

Source code in src/ragas/metrics/base.py
def batch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Synchronously calculate scores for a batch of inputs.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    return [self.score(**input_dict) for input_dict in inputs]

abatch_score async

abatch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

Asynchronously calculate scores for a batch of inputs in parallel.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

List of input dictionaries, each containing parameters for the metric.

required

Returns:

Type Description
List[MetricResult]

List of evaluation results, one for each input.

Source code in src/ragas/metrics/base.py
async def abatch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Asynchronously calculate scores for a batch of inputs in parallel.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    async_tasks = []
    for input_dict in inputs:
        # Process input asynchronously
        async_tasks.append(self.ascore(**input_dict))

    # Run all tasks concurrently and return results
    return await asyncio.gather(*async_tasks)

LLMMetric dataclass

LLMMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])(), prompt: Optional[Union[str, 'Prompt']] = None)

Bases: SimpleBaseMetric

LLM-based metric that uses prompts to generate structured responses.

save

save(path: Optional[str] = None) -> None

Save the metric configuration to a JSON file.

Parameters:

path : str, optional File path to save to. If not provided, saves to "./{metric.name}.json" Use .gz extension for compression.

Note:

If the metric has a response_model, its schema will be saved for reference but the model itself cannot be serialized. You'll need to provide it when loading.

Examples:

All these work:

metric.save() # → ./response_quality.json metric.save("custom.json") # → ./custom.json metric.save("/path/to/metrics/") # → /path/to/metrics/response_quality.json metric.save("no_extension") # → ./no_extension.json metric.save("compressed.json.gz") # → ./compressed.json.gz (compressed)

Source code in src/ragas/metrics/base.py
def save(self, path: t.Optional[str] = None) -> None:
    """
    Save the metric configuration to a JSON file.

    Parameters:
    -----------
    path : str, optional
        File path to save to. If not provided, saves to "./{metric.name}.json"
        Use .gz extension for compression.

    Note:
    -----
    If the metric has a response_model, its schema will be saved for reference
    but the model itself cannot be serialized. You'll need to provide it when loading.

    Examples:
    ---------
    All these work:
    >>> metric.save()                      # → ./response_quality.json
    >>> metric.save("custom.json")         # → ./custom.json
    >>> metric.save("/path/to/metrics/")   # → /path/to/metrics/response_quality.json
    >>> metric.save("no_extension")        # → ./no_extension.json
    >>> metric.save("compressed.json.gz")  # → ./compressed.json.gz (compressed)
    """
    import gzip
    import json
    import warnings
    from pathlib import Path

    # Handle default path
    if path is None:
        # Default to current directory with metric name as filename
        file_path = Path(f"./{self.name}.json")
    else:
        file_path = Path(path)

        # If path is a directory, append the metric name as filename
        if file_path.is_dir():
            file_path = file_path / f"{self.name}.json"
        # If path has no extension, add .json
        elif not file_path.suffix:
            file_path = file_path.with_suffix(".json")

    # Collect warning messages for data loss
    warning_messages = []

    if hasattr(self, "_response_model") and self._response_model:
        # Only warn for custom response models, not auto-generated ones
        if not getattr(self._response_model, "__ragas_auto_generated__", False):
            warning_messages.append(
                "- Custom response_model will be lost (set it manually after loading)"
            )

    # Serialize the prompt (may add embedding_model warning)
    prompt_data = self._serialize_prompt(warning_messages)

    # Determine the metric type
    metric_type = self.__class__.__name__

    # Get metric-specific config
    config = self._get_metric_config()

    # Emit consolidated warning if there's data loss
    if warning_messages:
        warnings.warn(
            "Some metric components cannot be saved and will be lost:\n"
            + "\n".join(warning_messages)
            + "\n\nYou'll need to provide these when loading the metric."
        )

    data = {
        "format_version": "1.0",
        "metric_type": metric_type,
        "name": self.name,
        "prompt": prompt_data,
        "config": config,
        "response_model_info": self._serialize_response_model_info(),
    }
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "wt", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
        else:
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
    except (OSError, IOError) as e:
        raise ValueError(f"Cannot save metric to {file_path}: {e}")

load classmethod

load(path: str, response_model: Optional[Type['BaseModel']] = None, embedding_model: Optional['EmbeddingModelType'] = None) -> 'SimpleLLMMetric'

Load a metric from a JSON file.

Parameters:

path : str File path to load from. Supports .gz compressed files. response_model : Optional[Type[BaseModel]] Pydantic model to use for response validation. Required for custom SimpleLLMMetrics. embedding_model : Optional[Any] Embedding model for DynamicFewShotPrompt. Required if the original used one.

Returns:

SimpleLLMMetric Loaded metric instance

Raises:

ValueError If file cannot be loaded, is invalid, or missing required models

Source code in src/ragas/metrics/base.py
@classmethod
def load(
    cls,
    path: str,
    response_model: t.Optional[t.Type["BaseModel"]] = None,
    embedding_model: t.Optional["EmbeddingModelType"] = None,
) -> "SimpleLLMMetric":
    """
    Load a metric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    response_model : Optional[Type[BaseModel]]
        Pydantic model to use for response validation. Required for custom SimpleLLMMetrics.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    SimpleLLMMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded, is invalid, or missing required models
    """
    import gzip
    import json
    from pathlib import Path

    file_path = Path(path)

    # Load JSON data
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "rt", encoding="utf-8") as f:
                data = json.load(f)
        else:
            with open(file_path, "r", encoding="utf-8") as f:
                data = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
        raise ValueError(f"Cannot load metric from {path}: {e}")

    # Validate format
    if data.get("format_version") != "1.0":
        import warnings

        warnings.warn(
            f"Loading metric with format version {data.get('format_version')}, expected 1.0"
        )

    # Reconstruct the prompt
    prompt = cls._deserialize_prompt(data["prompt"], embedding_model)

    # Get config
    config = data.get("config", {})

    # Create the metric instance
    metric = cls(name=data["name"], prompt=prompt, **config)

    # Set response model if provided
    if response_model:
        metric._response_model = response_model

    return metric

get_correlation abstractmethod

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

Calculate the correlation between gold scores and predicted scores. This is a placeholder method and should be implemented based on the specific metric.

Source code in src/ragas/metrics/base.py
@abstractmethod
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold scores and predicted scores.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    pass

align_and_validate

align_and_validate(dataset: 'Dataset', embedding_model: 'EmbeddingModelType', llm: 'BaseRagasLLM', test_size: float = 0.2, random_state: int = 42, **kwargs: Dict[str, Any])

Args: dataset: experiment to align the metric with. embedding_model: The embedding model used for dynamic few-shot prompting. llm: The LLM instance to use for scoring.

Align the metric with the specified experiments and validate it against a gold standard experiment. This method combines alignment and validation into a single step.

Source code in src/ragas/metrics/base.py
def align_and_validate(
    self,
    dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    llm: "BaseRagasLLM",
    test_size: float = 0.2,
    random_state: int = 42,
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        dataset: experiment to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.
        llm: The LLM instance to use for scoring.

    Align the metric with the specified experiments and validate it against a gold standard experiment.
    This method combines alignment and validation into a single step.
    """
    train_dataset, test_dataset = dataset.train_test_split(
        test_size=test_size, random_state=random_state
    )

    self.align(train_dataset, embedding_model, **kwargs)  # type: ignore
    return self.validate_alignment(llm, test_dataset)  # type: ignore

align

align(train_dataset: 'Dataset', embedding_model: 'EmbeddingModelType', **kwargs: Dict[str, Any])

Args: train_dataset: train_dataset to align the metric with. embedding_model: The embedding model used for dynamic few-shot prompting.

Align the metric with the specified experiments by different optimization methods.

Source code in src/ragas/metrics/base.py
def align(
    self,
    train_dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        train_dataset: train_dataset to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.

    Align the metric with the specified experiments by different optimization methods.
    """

    # get prompt
    if not self.prompt:
        raise Exception("prompt not passed")
    from ragas.prompt.simple_prompt import Prompt

    self.prompt = (
        self.prompt if isinstance(self.prompt, Prompt) else Prompt(self.prompt)
    )
    # Extract specific parameters for from_prompt method
    max_similar_examples_val = kwargs.get("max_similar_examples", 3)
    similarity_threshold_val = kwargs.get("similarity_threshold", 0.7)
    max_similar_examples = (
        int(max_similar_examples_val)
        if isinstance(max_similar_examples_val, (int, str))
        else 3
    )
    similarity_threshold = (
        float(similarity_threshold_val)
        if isinstance(similarity_threshold_val, (int, float, str))
        else 0.7
    )
    # Convert BaseRagasEmbeddings to BaseRagasEmbedding if needed
    if hasattr(embedding_model, "embed_query"):
        # For legacy BaseRagasEmbeddings, we need to wrap it
        # Create a wrapper that implements BaseRagasEmbedding interface
        class EmbeddingWrapper:
            def __init__(self, legacy_embedding):
                self.legacy_embedding = legacy_embedding

            def embed_text(self, text: str, **kwargs) -> t.List[float]:
                return self.legacy_embedding.embed_query(text)

            async def aembed_text(self, text: str, **kwargs) -> t.List[float]:
                return await self.legacy_embedding.aembed_query(text)

        actual_embedding_model = EmbeddingWrapper(embedding_model)
    else:
        # Already BaseRagasEmbedding
        actual_embedding_model = embedding_model

    from ragas.prompt.dynamic_few_shot import DynamicFewShotPrompt

    self.prompt = DynamicFewShotPrompt.from_prompt(
        self.prompt,
        actual_embedding_model,  # type: ignore[arg-type]
        max_similar_examples,
        similarity_threshold,
    )
    train_dataset.reload()
    total_items = len(train_dataset)
    input_vars = self.get_variables()
    output_vars = [self.name, f"{self.name}_reason"]

    from rich.progress import Progress

    with Progress() as progress:
        task = progress.add_task("Processing examples", total=total_items)
        for row in train_dataset:
            inputs = {
                var: train_dataset.get_row_value(row, var) for var in input_vars
            }
            inputs = {k: v for k, v in inputs.items() if v is not None}
            output = {
                var: train_dataset.get_row_value(row, var) for var in output_vars
            }
            output = {k: v for k, v in output.items() if v is not None}

            if output:
                self.prompt.add_example(inputs, output)
            progress.update(task, advance=1)

validate_alignment

validate_alignment(llm: 'BaseRagasLLM', test_dataset: 'Dataset', mapping: Dict[str, str] = {})

Args: llm: The LLM instance to use for scoring. test_dataset: An Dataset instance containing the gold standard scores. mapping: A dictionary mapping variable names expected by metrics to their corresponding names in the gold experiment.

Validate the alignment of the metric by comparing the scores against a gold standard experiment. This method computes the Cohen's Kappa score and agreement rate between the gold standard scores and the predicted scores from the metric.

Source code in src/ragas/metrics/base.py
def validate_alignment(
    self,
    llm: "BaseRagasLLM",
    test_dataset: "Dataset",
    mapping: t.Dict[str, str] = {},
):
    """
    Args:
        llm: The LLM instance to use for scoring.
        test_dataset: An Dataset instance containing the gold standard scores.
        mapping: A dictionary mapping variable names expected by metrics to their corresponding names in the gold experiment.

    Validate the alignment of the metric by comparing the scores against a gold standard experiment.
    This method computes the Cohen's Kappa score and agreement rate between the gold standard scores and
    the predicted scores from the metric.
    """

    test_dataset.reload()
    gold_scores_raw = [
        test_dataset.get_row_value(row, self.name) for row in test_dataset
    ]
    pred_scores = []
    for row in test_dataset:
        values = {
            v: (
                test_dataset.get_row_value(row, v)
                if v not in mapping
                else test_dataset.get_row_value(row, mapping.get(v, v))
            )
            for v in self.get_variables()
        }
        score = self.score(llm=llm, **values)
        pred_scores.append(score.value)

    # Convert to strings for correlation calculation, filtering out None values
    gold_scores = [str(score) for score in gold_scores_raw if score is not None]
    pred_scores_str = [str(score) for score in pred_scores if score is not None]

    df = test_dataset.to_pandas()
    df[f"{self.name}_pred"] = pred_scores
    correlation = self.get_correlation(gold_scores, pred_scores_str)
    agreement_rate = sum(
        x == y for x, y in zip(gold_scores, pred_scores_str)
    ) / len(gold_scores)
    return {
        "correlation": correlation,
        "agreement_rate": agreement_rate,
        "df": df,
    }

SingleTurnMetric dataclass

SingleTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: Metric

A metric class for evaluating single-turn interactions.

This class provides methods to score single-turn samples, both synchronously and asynchronously.

single_turn_score

single_turn_score(sample: SingleTurnSample, callbacks: Callbacks = None) -> float

Synchronously score a single-turn sample.

May raise ImportError if nest_asyncio is not installed in a Jupyter-like environment.

Source code in src/ragas/metrics/base.py
def single_turn_score(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Synchronously score a single-turn sample.

    May raise ImportError if nest_asyncio is not installed in a Jupyter-like environment.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._single_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

single_turn_ascore async

single_turn_ascore(sample: SingleTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

Asynchronously score a single-turn sample with an optional timeout.

May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.

Source code in src/ragas/metrics/base.py
async def single_turn_ascore(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Asynchronously score a single-turn sample with an optional timeout.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._single_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

DiscreteMetric dataclass

DiscreteMetric(name: str, allowed_values: List[str] = (lambda: ['pass', 'fail'])(), prompt: Optional[Union[str, 'Prompt']] = None)

Bases: SimpleLLMMetric, DiscreteValidator

Metric for categorical/discrete evaluations with predefined allowed values.

This class is used for metrics that output categorical values like "pass/fail", "good/bad/excellent", or custom discrete categories.

Attributes:

Name Type Description
allowed_values List[str]

List of allowed categorical values the metric can output. Default is ["pass", "fail"].

Examples:

>>> from ragas.metrics import DiscreteMetric
>>> from ragas.llms import LangchainLLMWrapper
>>> from langchain_openai import ChatOpenAI
>>>
>>> # Create a custom discrete metric
>>> llm = LangchainLLMWrapper(ChatOpenAI())
>>> metric = DiscreteMetric(
...     name="quality_check",
...     llm=llm,
...     allowed_values=["excellent", "good", "poor"]
... )

get_correlation

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

Calculate the correlation between gold labels and predictions. This is a placeholder method and should be implemented based on the specific metric.

Source code in src/ragas/metrics/discrete.py
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold labels and predictions.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    try:
        from sklearn.metrics import cohen_kappa_score
    except ImportError:
        raise ImportError(
            "scikit-learn is required for correlation calculation. "
            "Please install it with `pip install scikit-learn`."
        )
    return cohen_kappa_score(gold_labels, predictions)

load classmethod

load(path: str, embedding_model: Optional[EmbeddingModelType] = None) -> DiscreteMetric

Load a DiscreteMetric from a JSON file.

Parameters:

path : str File path to load from. Supports .gz compressed files. embedding_model : Optional[Any] Embedding model for DynamicFewShotPrompt. Required if the original used one.

Returns:

DiscreteMetric Loaded metric instance

Raises:

ValueError If file cannot be loaded or is not a DiscreteMetric

Source code in src/ragas/metrics/discrete.py
@classmethod
def load(
    cls, path: str, embedding_model: t.Optional["EmbeddingModelType"] = None
) -> "DiscreteMetric":
    """
    Load a DiscreteMetric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    DiscreteMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded or is not a DiscreteMetric
    """
    # Validate metric type before loading
    cls._validate_metric_type(path)

    # Load using parent class method
    metric = super().load(path, embedding_model=embedding_model)

    # Additional type check for safety
    if not isinstance(metric, cls):
        raise ValueError(f"Loaded metric is not a {cls.__name__}")

    return metric

NumericMetric dataclass

NumericMetric(name: str, allowed_values: Union[Tuple[float, float], range] = (0.0, 1.0), prompt: Optional[Union[str, 'Prompt']] = None)

Bases: SimpleLLMMetric, NumericValidator

Metric for continuous numeric evaluations within a specified range.

This class is used for metrics that output numeric scores within a defined range, such as 0.0 to 1.0 for similarity scores or 1-10 ratings.

Attributes:

Name Type Description
allowed_values Union[Tuple[float, float], range]

The valid range for metric outputs. Can be a tuple of (min, max) floats or a range object. Default is (0.0, 1.0).

Examples:

>>> from ragas.metrics import NumericMetric
>>> from ragas.llms import LangchainLLMWrapper
>>> from langchain_openai import ChatOpenAI
>>>
>>> # Create a custom numeric metric with 0-10 range
>>> llm = LangchainLLMWrapper(ChatOpenAI())
>>> metric = NumericMetric(
...     name="quality_score",
...     llm=llm,
...     allowed_values=(0.0, 10.0)
... )

get_correlation

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

Calculate the correlation between gold labels and predictions. This is a placeholder method and should be implemented based on the specific metric.

Source code in src/ragas/metrics/numeric.py
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold labels and predictions.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    try:
        from scipy.stats import pearsonr
    except ImportError:
        raise ImportError(
            "scipy is required for correlation calculation. "
            "Please install it with `pip install scipy`."
        )
    # Convert strings to floats for correlation calculation
    gold_floats = [float(x) for x in gold_labels]
    pred_floats = [float(x) for x in predictions]
    result = pearsonr(gold_floats, pred_floats)
    # pearsonr returns (correlation, p-value) tuple
    correlation = t.cast(float, result[0])
    return correlation

load classmethod

load(path: str, embedding_model: Optional[EmbeddingModelType] = None) -> NumericMetric

Load a NumericMetric from a JSON file.

Parameters:

path : str File path to load from. Supports .gz compressed files. embedding_model : Optional[Any] Embedding model for DynamicFewShotPrompt. Required if the original used one.

Returns:

NumericMetric Loaded metric instance

Raises:

ValueError If file cannot be loaded or is not a NumericMetric

Source code in src/ragas/metrics/numeric.py
@classmethod
def load(
    cls, path: str, embedding_model: t.Optional["EmbeddingModelType"] = None
) -> "NumericMetric":
    """
    Load a NumericMetric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    NumericMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded or is not a NumericMetric
    """
    # Validate metric type before loading
    cls._validate_metric_type(path)

    # Load using parent class method
    metric = super().load(path, embedding_model=embedding_model)

    # Additional type check for safety
    if not isinstance(metric, cls):
        raise ValueError(f"Loaded metric is not a {cls.__name__}")

    # Convert allowed_values back to tuple if it's a list (due to JSON serialization)
    if hasattr(metric, "allowed_values") and isinstance(
        metric.allowed_values, list
    ):
        # Ensure it's a 2-element tuple for NumericMetric
        if len(metric.allowed_values) == 2:
            metric.allowed_values = (
                metric.allowed_values[0],
                metric.allowed_values[1],
            )
        else:
            metric.allowed_values = tuple(metric.allowed_values)  # type: ignore

    return metric

RankingMetric dataclass

RankingMetric(name: str, allowed_values: int = 2, prompt: Optional[Union[str, 'Prompt']] = None)

Bases: SimpleLLMMetric, RankingValidator

Metric for evaluations that produce ranked lists of items.

This class is used for metrics that output ordered lists, such as ranking search results, prioritizing features, or ordering responses by relevance.

Attributes:

Name Type Description
allowed_values int

Expected number of items in the ranking list. Default is 2.

Examples:

>>> from ragas.metrics import RankingMetric
>>> from ragas.llms import LangchainLLMWrapper
>>> from langchain_openai import ChatOpenAI
>>>
>>> # Create a ranking metric that returns top 3 items
>>> llm = LangchainLLMWrapper(ChatOpenAI())
>>> metric = RankingMetric(
...     name="relevance_ranking",
...     llm=llm,
...     allowed_values=3
... )

get_correlation

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

Calculate the correlation between gold labels and predictions. This is a placeholder method and should be implemented based on the specific metric.

Source code in src/ragas/metrics/ranking.py
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold labels and predictions.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    try:
        from sklearn.metrics import cohen_kappa_score
    except ImportError:
        raise ImportError(
            "scikit-learn is required for correlation calculation. "
            "Please install it with `pip install scikit-learn`."
        )

    kappa_scores = []
    for gold_item, prediction in zip(gold_labels, predictions):
        kappa = cohen_kappa_score(gold_item, prediction, weights="quadratic")
        kappa_scores.append(kappa)

    return sum(kappa_scores) / len(kappa_scores) if kappa_scores else 0.0

load classmethod

load(path: str, embedding_model: Optional[EmbeddingModelType] = None) -> RankingMetric

Load a RankingMetric from a JSON file.

Parameters:

path : str File path to load from. Supports .gz compressed files. embedding_model : Optional[Any] Embedding model for DynamicFewShotPrompt. Required if the original used one.

Returns:

RankingMetric Loaded metric instance

Raises:

ValueError If file cannot be loaded or is not a RankingMetric

Source code in src/ragas/metrics/ranking.py
@classmethod
def load(
    cls, path: str, embedding_model: t.Optional["EmbeddingModelType"] = None
) -> "RankingMetric":
    """
    Load a RankingMetric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    RankingMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded or is not a RankingMetric
    """
    # Validate metric type before loading
    cls._validate_metric_type(path)

    # Load using parent class method
    metric = super().load(path, embedding_model=embedding_model)

    # Additional type check for safety
    if not isinstance(metric, cls):
        raise ValueError(f"Loaded metric is not a {cls.__name__}")

    return metric

MetricResult

MetricResult(value: Any, reason: Optional[str] = None, traces: Optional[Dict[str, Any]] = None)

Class to hold the result of a metric evaluation.

This class behaves like its underlying result value but still provides access to additional metadata like reasoning.

Works with: - DiscreteMetrics (string results) - NumericMetrics (float/int results) - RankingMetrics (list results)

Source code in src/ragas/metrics/result.py
def __init__(
    self,
    value: t.Any,
    reason: t.Optional[str] = None,
    traces: t.Optional[t.Dict[str, t.Any]] = None,
):
    if traces is not None:
        invalid_keys = [
            key for key in traces.keys() if key not in {"input", "output"}
        ]
        if invalid_keys:
            raise ValueError(
                f"Invalid keys in traces: {invalid_keys}. Allowed keys are 'input' and 'output'."
            )
    self._value = value
    self.reason = reason
    self.traces = traces

value property

value

Get the raw result value.

to_dict

to_dict()

Convert the result to a dictionary.

Source code in src/ragas/metrics/result.py
def to_dict(self):
    """Convert the result to a dictionary."""
    return {"result": self._value, "reason": self.reason}

validate classmethod

validate(value: Any, info: ValidationInfo)

Provide compatibility with older Pydantic versions.

Source code in src/ragas/metrics/result.py
@classmethod
def validate(cls, value: t.Any, info: ValidationInfo):
    """Provide compatibility with older Pydantic versions."""
    if isinstance(value, MetricResult):
        return value
    return cls(value=value)

discrete_metric

discrete_metric(*, name: Optional[str] = None, allowed_values: Optional[List[str]] = None, **metric_params: Any) -> Callable[[Callable[..., Any]], DiscreteMetricProtocol]

Decorator for creating discrete/categorical metrics.

This decorator transforms a regular function into a DiscreteMetric instance that can be used for evaluation with predefined categorical outputs.

Parameters:

Name Type Description Default
name str

Name for the metric. If not provided, uses the function name.

None
allowed_values List[str]

List of allowed categorical values for the metric output. Default is ["pass", "fail"].

None
**metric_params Any

Additional parameters to pass to the metric initialization.

{}

Returns:

Type Description
Callable[[Callable[..., Any]], DiscreteMetricProtocol]

A decorator that transforms a function into a DiscreteMetric instance.

Examples:

>>> from ragas.metrics import discrete_metric
>>>
>>> @discrete_metric(name="sentiment", allowed_values=["positive", "neutral", "negative"])
>>> def sentiment_analysis(user_input: str, response: str) -> str:
...     '''Analyze sentiment of the response.'''
...     if "great" in response.lower() or "good" in response.lower():
...         return "positive"
...     elif "bad" in response.lower() or "poor" in response.lower():
...         return "negative"
...     return "neutral"
>>>
>>> result = sentiment_analysis(
...     user_input="How was your day?",
...     response="It was great!"
... )
>>> print(result.value)  # "positive"
Source code in src/ragas/metrics/discrete.py
def discrete_metric(
    *,
    name: t.Optional[str] = None,
    allowed_values: t.Optional[t.List[str]] = None,
    **metric_params: t.Any,
) -> t.Callable[[t.Callable[..., t.Any]], DiscreteMetricProtocol]:
    """
    Decorator for creating discrete/categorical metrics.

    This decorator transforms a regular function into a DiscreteMetric instance
    that can be used for evaluation with predefined categorical outputs.

    Parameters
    ----------
    name : str, optional
        Name for the metric. If not provided, uses the function name.
    allowed_values : List[str], optional
        List of allowed categorical values for the metric output.
        Default is ["pass", "fail"].
    **metric_params : Any
        Additional parameters to pass to the metric initialization.

    Returns
    -------
    Callable[[Callable[..., Any]], DiscreteMetricProtocol]
        A decorator that transforms a function into a DiscreteMetric instance.

    Examples
    --------
    >>> from ragas.metrics import discrete_metric
    >>>
    >>> @discrete_metric(name="sentiment", allowed_values=["positive", "neutral", "negative"])
    >>> def sentiment_analysis(user_input: str, response: str) -> str:
    ...     '''Analyze sentiment of the response.'''
    ...     if "great" in response.lower() or "good" in response.lower():
    ...         return "positive"
    ...     elif "bad" in response.lower() or "poor" in response.lower():
    ...         return "negative"
    ...     return "neutral"
    >>>
    >>> result = sentiment_analysis(
    ...     user_input="How was your day?",
    ...     response="It was great!"
    ... )
    >>> print(result.value)  # "positive"
    """
    if allowed_values is None:
        allowed_values = ["pass", "fail"]

    decorator_factory = create_metric_decorator()
    return decorator_factory(name=name, allowed_values=allowed_values, **metric_params)  # type: ignore[return-value]

numeric_metric

numeric_metric(*, name: Optional[str] = None, allowed_values: Optional[Union[Tuple[float, float], range]] = None, **metric_params: Any) -> Callable[[Callable[..., Any]], NumericMetricProtocol]

Decorator for creating numeric/continuous metrics.

This decorator transforms a regular function into a NumericMetric instance that outputs continuous values within a specified range.

Parameters:

Name Type Description Default
name str

Name for the metric. If not provided, uses the function name.

None
allowed_values Union[Tuple[float, float], range]

The valid range for metric outputs as (min, max) tuple or range object. Default is (0.0, 1.0).

None
**metric_params Any

Additional parameters to pass to the metric initialization.

{}

Returns:

Type Description
Callable[[Callable[..., Any]], NumericMetricProtocol]

A decorator that transforms a function into a NumericMetric instance.

Examples:

>>> from ragas.metrics import numeric_metric
>>>
>>> @numeric_metric(name="relevance_score", allowed_values=(0.0, 1.0))
>>> def calculate_relevance(user_input: str, response: str) -> float:
...     '''Calculate relevance score between 0 and 1.'''
...     # Simple word overlap example
...     user_words = set(user_input.lower().split())
...     response_words = set(response.lower().split())
...     if not user_words:
...         return 0.0
...     overlap = len(user_words & response_words)
...     return overlap / len(user_words)
>>>
>>> result = calculate_relevance(
...     user_input="What is Python?",
...     response="Python is a programming language"
... )
>>> print(result.value)  # Numeric score between 0.0 and 1.0
Source code in src/ragas/metrics/numeric.py
def numeric_metric(
    *,
    name: t.Optional[str] = None,
    allowed_values: t.Optional[t.Union[t.Tuple[float, float], range]] = None,
    **metric_params: t.Any,
) -> t.Callable[[t.Callable[..., t.Any]], NumericMetricProtocol]:
    """
    Decorator for creating numeric/continuous metrics.

    This decorator transforms a regular function into a NumericMetric instance
    that outputs continuous values within a specified range.

    Parameters
    ----------
    name : str, optional
        Name for the metric. If not provided, uses the function name.
    allowed_values : Union[Tuple[float, float], range], optional
        The valid range for metric outputs as (min, max) tuple or range object.
        Default is (0.0, 1.0).
    **metric_params : Any
        Additional parameters to pass to the metric initialization.

    Returns
    -------
    Callable[[Callable[..., Any]], NumericMetricProtocol]
        A decorator that transforms a function into a NumericMetric instance.

    Examples
    --------
    >>> from ragas.metrics import numeric_metric
    >>>
    >>> @numeric_metric(name="relevance_score", allowed_values=(0.0, 1.0))
    >>> def calculate_relevance(user_input: str, response: str) -> float:
    ...     '''Calculate relevance score between 0 and 1.'''
    ...     # Simple word overlap example
    ...     user_words = set(user_input.lower().split())
    ...     response_words = set(response.lower().split())
    ...     if not user_words:
    ...         return 0.0
    ...     overlap = len(user_words & response_words)
    ...     return overlap / len(user_words)
    >>>
    >>> result = calculate_relevance(
    ...     user_input="What is Python?",
    ...     response="Python is a programming language"
    ... )
    >>> print(result.value)  # Numeric score between 0.0 and 1.0
    """
    if allowed_values is None:
        allowed_values = (0.0, 1.0)

    decorator_factory = create_metric_decorator()
    return decorator_factory(name=name, allowed_values=allowed_values, **metric_params)  # type: ignore[return-value]

ranking_metric

ranking_metric(*, name: Optional[str] = None, allowed_values: Optional[int] = None, **metric_params: Any) -> Callable[[Callable[..., Any]], RankingMetricProtocol]

Decorator for creating ranking/ordering metrics.

This decorator transforms a regular function into a RankingMetric instance that outputs ordered lists of items.

Parameters:

Name Type Description Default
name str

Name for the metric. If not provided, uses the function name.

None
allowed_values int

Expected number of items in the ranking list. Default is 2.

None
**metric_params Any

Additional parameters to pass to the metric initialization.

{}

Returns:

Type Description
Callable[[Callable[..., Any]], RankingMetricProtocol]

A decorator that transforms a function into a RankingMetric instance.

Examples:

>>> from ragas.metrics import ranking_metric
>>>
>>> @ranking_metric(name="priority_ranker", allowed_values=3)
>>> def rank_by_urgency(user_input: str, responses: list) -> list:
...     '''Rank responses by urgency keywords.'''
...     urgency_keywords = ["urgent", "asap", "critical"]
...     scored = []
...     for resp in responses:
...         score = sum(kw in resp.lower() for kw in urgency_keywords)
...         scored.append((score, resp))
...     # Sort by score descending and return top items
...     ranked = sorted(scored, key=lambda x: x[0], reverse=True)
...     return [item[1] for item in ranked[:3]]
>>>
>>> result = rank_by_urgency(
...     user_input="What should I do first?",
...     responses=["This is urgent", "Take your time", "Critical issue!"]
... )
>>> print(result.value)  # Ranked list of responses
Source code in src/ragas/metrics/ranking.py
def ranking_metric(
    *,
    name: t.Optional[str] = None,
    allowed_values: t.Optional[int] = None,
    **metric_params: t.Any,
) -> t.Callable[[t.Callable[..., t.Any]], RankingMetricProtocol]:
    """
    Decorator for creating ranking/ordering metrics.

    This decorator transforms a regular function into a RankingMetric instance
    that outputs ordered lists of items.

    Parameters
    ----------
    name : str, optional
        Name for the metric. If not provided, uses the function name.
    allowed_values : int, optional
        Expected number of items in the ranking list. Default is 2.
    **metric_params : Any
        Additional parameters to pass to the metric initialization.

    Returns
    -------
    Callable[[Callable[..., Any]], RankingMetricProtocol]
        A decorator that transforms a function into a RankingMetric instance.

    Examples
    --------
    >>> from ragas.metrics import ranking_metric
    >>>
    >>> @ranking_metric(name="priority_ranker", allowed_values=3)
    >>> def rank_by_urgency(user_input: str, responses: list) -> list:
    ...     '''Rank responses by urgency keywords.'''
    ...     urgency_keywords = ["urgent", "asap", "critical"]
    ...     scored = []
    ...     for resp in responses:
    ...         score = sum(kw in resp.lower() for kw in urgency_keywords)
    ...         scored.append((score, resp))
    ...     # Sort by score descending and return top items
    ...     ranked = sorted(scored, key=lambda x: x[0], reverse=True)
    ...     return [item[1] for item in ranked[:3]]
    >>>
    >>> result = rank_by_urgency(
    ...     user_input="What should I do first?",
    ...     responses=["This is urgent", "Take your time", "Critical issue!"]
    ... )
    >>> print(result.value)  # Ranked list of responses
    """
    if allowed_values is None:
        allowed_values = 2

    decorator_factory = create_metric_decorator()
    return decorator_factory(name=name, allowed_values=allowed_values, **metric_params)  # type: ignore[return-value]