Skip to content

Async Evaluation

aevaluate()

Async version of evaluate that performs evaluation without applying nest_asyncio.

This function is the async-first implementation that doesn't patch the event loop, making it safe to use in production async applications.

Parameters are identical to evaluate() function.

Returns:

Type Description
EvaluationResult or Executor

If return_executor is False, returns EvaluationResult object containing the scores of each metric. If return_executor is True, returns the Executor instance for cancellable execution.

Examples:

import asyncio
from ragas import aevaluate

async def main():
    result = await aevaluate(dataset, metrics)
    print(result)

asyncio.run(main())
Source code in src/ragas/evaluation.py
async def aevaluate(
    dataset: t.Union[Dataset, EvaluationDataset],
    metrics: t.Optional[t.Sequence[Metric]] = None,
    llm: t.Optional[BaseRagasLLM | InstructorBaseRagasLLM | LangchainLLM] = None,
    embeddings: t.Optional[
        BaseRagasEmbeddings | BaseRagasEmbedding | LangchainEmbeddings
    ] = None,
    experiment_name: t.Optional[str] = None,
    callbacks: Callbacks = None,
    run_config: t.Optional[RunConfig] = None,
    token_usage_parser: t.Optional[TokenUsageParser] = None,
    raise_exceptions: bool = False,
    column_map: t.Optional[t.Dict[str, str]] = None,
    show_progress: bool = True,
    batch_size: t.Optional[int] = None,
    _run_id: t.Optional[UUID] = None,
    _pbar: t.Optional[tqdm] = None,
    return_executor: bool = False,
) -> t.Union[EvaluationResult, Executor]:
    """
    Async version of evaluate that performs evaluation without applying nest_asyncio.

    This function is the async-first implementation that doesn't patch the event loop,
    making it safe to use in production async applications.

    Parameters are identical to evaluate() function.

    Returns
    -------
    EvaluationResult or Executor
        If return_executor is False, returns EvaluationResult object containing the scores of each metric.
        If return_executor is True, returns the Executor instance for cancellable execution.

    Examples
    --------
    ```python
    import asyncio
    from ragas import aevaluate

    async def main():
        result = await aevaluate(dataset, metrics)
        print(result)

    asyncio.run(main())
    ```
    """
    column_map = column_map or {}
    callbacks = callbacks or []
    run_config = run_config or RunConfig()

    if helicone_config.is_enabled:
        import uuid

        helicone_config.session_name = "ragas-evaluation"
        helicone_config.session_id = str(uuid.uuid4())

    if dataset is None:
        raise ValueError("Provide dataset!")

    # Check metrics are correct type
    if not isinstance(metrics, (type(None), list)):
        raise TypeError(
            "Metrics should be provided in a list, e.g: metrics=[BleuScore()]"
        )

    if isinstance(metrics, list) and any(not isinstance(m, Metric) for m in metrics):
        raise TypeError(
            "All metrics must be initialised metric objects, e.g: metrics=[BleuScore(), AspectCritic()]"
        )

    # default metrics
    if metrics is None:
        from ragas.metrics import (
            answer_relevancy,
            context_precision,
            context_recall,
            faithfulness,
        )

        metrics = [answer_relevancy, context_precision, faithfulness, context_recall]

    if isinstance(dataset, Dataset):
        # remap column names from the dataset
        dataset = remap_column_names(dataset, column_map)
        dataset = convert_v1_to_v2_dataset(dataset)
        # validation
        dataset = EvaluationDataset.from_list(dataset.to_list())

    if isinstance(dataset, EvaluationDataset):
        validate_required_columns(dataset, metrics)
        validate_supported_metrics(dataset, metrics)

    # set the llm and embeddings
    if isinstance(llm, LangchainLLM):
        llm = LangchainLLMWrapper(llm, run_config=run_config)
    if isinstance(embeddings, LangchainEmbeddings):
        embeddings = LangchainEmbeddingsWrapper(embeddings)

    # init llms and embeddings
    binary_metrics = []
    llm_changed: t.List[int] = []
    embeddings_changed: t.List[int] = []
    answer_correctness_is_set = -1

    # loop through the metrics and perform initializations
    for i, metric in enumerate(metrics):
        # set llm and embeddings if not set
        if isinstance(metric, AspectCritic):
            binary_metrics.append(metric.name)
        if isinstance(metric, MetricWithLLM) and metric.llm is None:
            if llm is None:
                from openai import OpenAI

                client = OpenAI()
                llm = llm_factory("gpt-4o-mini", client=client)
            metric.llm = t.cast(t.Optional[BaseRagasLLM], llm)
            llm_changed.append(i)
        if isinstance(metric, MetricWithEmbeddings) and metric.embeddings is None:
            if embeddings is None:
                embeddings = embedding_factory()
            metric.embeddings = embeddings
            embeddings_changed.append(i)
        if isinstance(metric, AnswerCorrectness):
            if metric.answer_similarity is None:
                answer_correctness_is_set = i

        # init all the models
        metric.init(run_config)

    executor = Executor(
        desc="Evaluating",
        keep_progress_bar=True,
        raise_exceptions=raise_exceptions,
        run_config=run_config,
        show_progress=show_progress,
        batch_size=batch_size,
        pbar=_pbar,
    )

    # Ragas Callbacks
    # init the callbacks we need for various tasks
    ragas_callbacks: t.Dict[str, BaseCallbackHandler] = {}

    # Ragas Tracer which traces the run
    tracer = RagasTracer()
    ragas_callbacks["tracer"] = tracer

    # check if cost needs to be calculated
    if token_usage_parser is not None:
        from ragas.cost import CostCallbackHandler

        cost_cb = CostCallbackHandler(token_usage_parser=token_usage_parser)
        ragas_callbacks["cost_cb"] = cost_cb

    # append all the ragas_callbacks to the callbacks
    for cb in ragas_callbacks.values():
        if isinstance(callbacks, BaseCallbackManager):
            callbacks.add_handler(cb)
        else:
            callbacks.append(cb)

    # new evaluation chain
    row_run_managers = []
    evaluation_rm, evaluation_group_cm = new_group(
        name=experiment_name or RAGAS_EVALUATION_CHAIN_NAME,
        inputs={},
        callbacks=callbacks,
        metadata={"type": ChainType.EVALUATION},
    )

    sample_type = dataset.get_sample_type()
    for i, sample in enumerate(dataset):
        row = t.cast(t.Dict[str, t.Any], sample.model_dump())
        row_rm, row_group_cm = new_group(
            name=f"row {i}",
            inputs=row,
            callbacks=evaluation_group_cm,
            metadata={"type": ChainType.ROW, "row_index": i},
        )
        row_run_managers.append((row_rm, row_group_cm))
        if sample_type == SingleTurnSample:
            _ = [
                executor.submit(
                    metric.single_turn_ascore,
                    sample,
                    row_group_cm,
                    name=f"{metric.name}-{i}",
                    timeout=run_config.timeout,
                )
                for metric in metrics
                if isinstance(metric, SingleTurnMetric)
            ]
        elif sample_type == MultiTurnSample:
            _ = [
                executor.submit(
                    metric.multi_turn_ascore,
                    sample,
                    row_group_cm,
                    name=f"{metric.name}-{i}",
                    timeout=run_config.timeout,
                )
                for metric in metrics
                if isinstance(metric, MultiTurnMetric)
            ]
        else:
            raise ValueError(f"Unsupported sample type {sample_type}")

    # Return executor for cancellable execution if requested
    if return_executor:
        return executor

    scores: t.List[t.Dict[str, t.Any]] = []
    try:
        # get the results using async method
        results = await executor.aresults()
        if results == []:
            raise ExceptionInRunner()

        # convert results to dataset_like
        for i, _ in enumerate(dataset):
            s = {}
            for j, m in enumerate(metrics):
                if isinstance(m, ModeMetric):  # type: ignore
                    key = f"{m.name}(mode={m.mode})"
                else:
                    key = m.name
                s[key] = results[len(metrics) * i + j]
            scores.append(s)
            # close the row chain
            row_rm, row_group_cm = row_run_managers[i]
            if not row_group_cm.ended:
                row_rm.on_chain_end(s)

    # run evaluation task
    except Exception as e:
        if not evaluation_group_cm.ended:
            evaluation_rm.on_chain_error(e)

        raise e
    else:
        # evalution run was successful
        # now lets process the results
        cost_cb = ragas_callbacks["cost_cb"] if "cost_cb" in ragas_callbacks else None
        result = EvaluationResult(
            scores=scores,
            dataset=dataset,
            binary_columns=binary_metrics,
            cost_cb=t.cast(
                t.Union["CostCallbackHandler", None],
                cost_cb,
            ),
            ragas_traces=tracer.traces,
            run_id=_run_id,
        )
        if not evaluation_group_cm.ended:
            evaluation_rm.on_chain_end({"scores": result.scores})
    finally:
        # reset llms and embeddings if changed
        for i in llm_changed:
            t.cast(MetricWithLLM, metrics[i]).llm = None
        for i in embeddings_changed:
            t.cast(MetricWithEmbeddings, metrics[i]).embeddings = None
        if answer_correctness_is_set != -1:
            t.cast(
                AnswerCorrectness, metrics[answer_correctness_is_set]
            ).answer_similarity = None

        # flush the analytics batcher
        from ragas._analytics import _analytics_batcher

        _analytics_batcher.flush()

    return result

Async Usage

Ragas provides both synchronous and asynchronous evaluation APIs to accommodate different use cases:

For production async applications, use aevaluate() to avoid event loop conflicts:

import asyncio
from ragas import aevaluate

async def evaluate_app():
    result = await aevaluate(dataset, metrics)
    return result

# In your async application
result = await evaluate_app()

Using evaluate() with Async Control

For backward compatibility and Jupyter notebook usage, evaluate() provides optional control over nest_asyncio:

# Default behavior (Jupyter-compatible)
result = evaluate(dataset, metrics)  # allow_nest_asyncio=True

# Production-safe (avoids event loop patching)
result = evaluate(dataset, metrics, allow_nest_asyncio=False)

Migration from nest_asyncio Issues

If you're experiencing issues with nest_asyncio in production:

Before (problematic):

# This may cause event loop conflicts
result = evaluate(dataset, metrics)

After (fixed):

# Option 1: Use async API
result = await aevaluate(dataset, metrics)

# Option 2: Disable nest_asyncio
result = evaluate(dataset, metrics, allow_nest_asyncio=False)