# Copyright 2025 - Oumi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import inspect
import time
from dataclasses import fields
from datetime import datetime
from typing import Any, Callable, Optional, Union
from oumi.builders.inference_engines import build_inference_engine
from oumi.core.configs import (
AlpacaEvalTaskParams,
EvaluationConfig,
EvaluationTaskParams,
LMHarnessTaskParams,
)
from oumi.core.configs.params.evaluation_params import EvaluationBackend
from oumi.core.distributed import is_world_process_zero
from oumi.core.evaluation.backends.alpaca_eval import evaluate as evaluate_alpaca_eval
from oumi.core.evaluation.backends.lm_harness import evaluate as evaluate_lm_harness
from oumi.core.evaluation.evaluation_result import EvaluationResult
from oumi.core.evaluation.utils.platform_prerequisites import check_prerequisites
from oumi.core.evaluation.utils.save_utils import save_evaluation_output
from oumi.core.inference import BaseInferenceEngine
from oumi.core.registry import REGISTRY
_EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME = "inference_engine"
_EVALUATION_FN_TASK_PARAMS_INPUT_PARAM_NAME = "task_params"
_EVALUATION_FN_CONFIG_INPUT_PARAM_NAME = "config"
# Reserved keys that a custom evaluation function might define as inputs. The values of
# these keys, if defined as inputs, will be automatically populated by the Evaluator.
# The user is NOT allowed to pass these as keyword arguments when calling the
# `Evaluator.evaluate()` function.
RESERVED_KEYS = {
_EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME,
_EVALUATION_FN_TASK_PARAMS_INPUT_PARAM_NAME,
_EVALUATION_FN_CONFIG_INPUT_PARAM_NAME,
}
[docs]
class Evaluator:
"""A class for evaluating language models on various tasks.
Currently, the evaluator supports a wide range of tasks that are handled by three
separate backends: LM Harness, Alpaca Eval, and Custom.
- LM Harness: Framework by EleutherAI for evaluating language models (mostly) on
standardized benchmarks (multiple-choice, word match, etc). The backend supports
a large number of popular benchmarks, which can be found at:
https://github.com/EleutherAI/lm-evaluation-harness/tree/main/lm_eval/tasks.
- Alpaca Eval: Framework for evaluating the instruction-following capabilities of
language models, as well as whether their responses are helpful, accurate, and
relevant. The instruction set consists of 805 open-ended questions, while the
evaluation is based on "LLM-as-judge" and prioritizes human-alignment, aiming
to assess whether the model responses meet the expectations of human evaluators.
- Custom: Users can register their own evaluation functions using the decorator
`@register_evaluation_function` and run custom evaluations based on their
functions. Note that the `task_name` should be the registry key for the custom
evaluation function to be used.
"""
_inference_engine: Optional[BaseInferenceEngine] = None
"""Inference engine used for evaluation, if needed by the tasks."""
[docs]
def evaluate(self, config: EvaluationConfig, **kwargs) -> list[EvaluationResult]:
"""Evaluates a model using the provided evaluation configuration.
Args:
config: The desired configuration for evaluation.
kwargs: Additional keyword arguments required by evaluator backends.
Returns:
List of evaluation results (one per task, in the same order with `tasks`).
"""
# Create a copy of the evaluation config, without tasks, so that there is no
# redundant information in the `config` input parameter of `self.evaluate_task`.
config_without_tasks = copy.deepcopy(config)
config_without_tasks.tasks = []
# Evaluate on each task included in the configuration, serially.
evaluation_results = []
for task in config.tasks:
evaluation_result = self.evaluate_task(
task_params=task, config=config_without_tasks, **kwargs
)
evaluation_results.append(evaluation_result)
return evaluation_results
[docs]
def evaluate_task(
self,
task_params: EvaluationTaskParams,
config: EvaluationConfig,
**kwargs,
) -> EvaluationResult:
"""Evaluates a model using the provided configuration on a specific task.
Args:
task_params: The task parameters for evaluation.
config: The desired evaluation configuration for evaluation.
kwargs: Additional keyword arguments required by evaluator backends.
Returns:
The results for evaluating on the task.
"""
# Find the proper backend to execute the evaluation task.
evaluation_backend: EvaluationBackend = task_params.get_evaluation_backend()
# Ensure the task prerequisites are satisfied; fast-fail if not.
check_prerequisites(
evaluation_backend=evaluation_backend,
task_name=task_params.task_name,
)
# Get a timestamp at the beginning of the current run.
start_time_str = datetime.now().strftime("%Y%m%d_%H%M%S")
start_time = time.time()
# Redirect the evaluation execution to the appropriate evaluation backend.
if evaluation_backend == EvaluationBackend.LM_HARNESS:
lm_harness_task_params = self._get_backend_task_params(task_params)
assert isinstance(lm_harness_task_params, LMHarnessTaskParams)
# Destroy the inference engine, if created by a previous task. LM Harness
# uses its own inference engine, which is created internally.
if self._inference_engine:
del self._inference_engine
self._inference_engine = None
evaluation_result = evaluate_lm_harness(
task_params=lm_harness_task_params,
config=config,
**kwargs, # random_seed, numpy_random_seed, torch_random_seed
)
elif evaluation_backend == EvaluationBackend.ALPACA_EVAL:
alpaca_eval_task_params = self._get_backend_task_params(task_params)
assert isinstance(alpaca_eval_task_params, AlpacaEvalTaskParams)
evaluation_result = evaluate_alpaca_eval(
task_params=alpaca_eval_task_params,
config=config,
inference_engine=self._get_inference_engine(config),
**kwargs,
)
elif evaluation_backend == EvaluationBackend.CUSTOM:
evaluation_fn_name = task_params.task_name or ""
evaluation_fn = self._get_custom_evaluation_fn(evaluation_fn_name)
custom_kwargs = self._merge_kwargs(kwargs, task_params.eval_kwargs)
self._validate_custom_kwargs(
custom_kwargs=custom_kwargs,
evaluation_fn=evaluation_fn,
evaluation_fn_name=evaluation_fn_name,
)
self._add_reserved_keys_into_custom_kwargs(
custom_kwargs=custom_kwargs,
evaluation_fn=evaluation_fn,
task_params=task_params,
config=config,
)
evaluation_output = evaluation_fn(**custom_kwargs)
if isinstance(evaluation_output, EvaluationResult):
evaluation_result = evaluation_output
elif isinstance(evaluation_output, dict):
evaluation_result = EvaluationResult(
task_name=task_params.task_name,
task_result={"results": {task_params.task_name: evaluation_output}},
)
else:
raise ValueError(
f"The custom evaluation function `{task_params.task_name}` must "
"return either a `dict` or an `EvaluationResult` object, but it is "
f"currently returning an object of type `{type(evaluation_output)}`"
". Please ensure that the function returns the correct object."
)
else:
raise ValueError(f"Unknown evaluation backend: {evaluation_backend}")
# Calculate the elapsed time for the evaluation run.
evaluation_result.elapsed_time_sec = int(time.time() - start_time)
evaluation_result.start_time = start_time_str
# Save the output, if an output directory has been provided.
if config.output_dir and is_world_process_zero():
self.save_output(
task_params=task_params,
evaluation_result=evaluation_result,
base_output_dir=config.output_dir,
config=config,
)
return evaluation_result
[docs]
def save_output(
self,
task_params: EvaluationTaskParams,
evaluation_result: EvaluationResult,
base_output_dir: str,
config: Optional[EvaluationConfig],
) -> None:
"""Saves the evaluation's output to the specified output directory.
Args:
task_params: The task parameters used for this evaluation.
evaluation_result: The evaluation result.
base_output_dir: The directory where the evaluation results will be saved.
config: The evaluation configuration.
Returns:
None
"""
save_evaluation_output(
backend_name=task_params.evaluation_backend,
task_params=task_params,
evaluation_result=evaluation_result,
base_output_dir=base_output_dir,
config=config,
)
@staticmethod
def _get_custom_evaluation_fn(task_name: Optional[str]) -> Callable:
"""Retrieve the evaluation function of the custom task."""
if not task_name:
raise ValueError(
"Missing `task_name` for custom Oumi evaluation. Please specify the "
"task name, which should be corresponding to a registered evaluation "
"function, using the decorator `@register_evaluation_function`."
)
# Import to ensure custom evaluation functions are added to REGISTRY.
import oumi.evaluation.registry as evaluation_registry # noqa: F401
if evaluation_fn := REGISTRY.get_evaluation_function(task_name):
return evaluation_fn
else:
raise ValueError(
f"Task name `{task_name}` not found in the registry. For custom Oumi "
"evaluations, the task name must match the name of a registered "
"evaluation function. You can register a new function with the "
"decorator `@register_evaluation_function`."
)
@staticmethod
def _get_backend_task_params(
task_params: EvaluationTaskParams,
) -> Union[LMHarnessTaskParams, AlpacaEvalTaskParams]:
"""Returns the evaluation backend-specific task parameters."""
if task_params.get_evaluation_backend() == EvaluationBackend.LM_HARNESS:
target_class = LMHarnessTaskParams
elif task_params.get_evaluation_backend() == EvaluationBackend.ALPACA_EVAL:
target_class = AlpacaEvalTaskParams
elif task_params.get_evaluation_backend() == EvaluationBackend.CUSTOM:
raise ValueError(
"The custom evaluation backend is not subclassing EvaluationTaskParams."
" Thus, `Evaluator._get_backend_task_params()` should not be called "
" when evaluation_backend is set to `EvaluationBackend.CUSTOM`."
)
else:
raise ValueError(f"Unknown backend: {task_params.evaluation_backend}")
init_kwargs = Evaluator._get_init_kwargs_for_task_params_class(
task_params=task_params, target_class=target_class
)
return target_class(**init_kwargs)
@staticmethod
def _get_init_kwargs_for_task_params_class(
task_params: EvaluationTaskParams,
target_class: type[EvaluationTaskParams],
) -> dict[str, Any]:
"""Returns the init keyword arguments for a `target_class` of name *TaskParams.
Given a target class of name <evaluation backend>TaskParams, which subclasses
`EvaluationTaskParams`, this method returns a 'flattened' dict with all
arguments needed to instantiate it. The dict includes all the parameters which
are already members of `EvaluationTaskParams`, as well as additional parameters
which are only known to the target class (stored under `eval_kwargs`).
By 'flattened', we mean that all known parameters that are stored under the
`eval_kwargs` dict are moved one level up, to the (flat) dict that is returned.
In contrast, all unknown (to the target class) parameters remain (unflattened)
inside the `eval_kwargs` dict.
Example:
Assuming these are the input parameters:
task_params: EvaluationTaskParams( # <- `num_fewshot` is NOT a member
evaluation_backend=EvaluationBackend.LM_HARNESS,
task_name="mmlu",
eval_kwargs={"num_fewshot": 10, "some_param": 20},
)
target_class: LMHarnessTaskParams # <- `num_fewshot` is a member
This function will return:
{
"evaluation_backend": EvaluationBackend.LM_HARNESS,
"task_name": "mmlu",
"num_fewshot": 10,
"eval_kwargs": {"some_param": 20}
}
"""
task_params = copy.deepcopy(task_params)
# Find all keys in `eval_kwargs` which are known to the target class.
known_keys = []
if task_params.eval_kwargs:
field_names = [field.name for field in fields(target_class)]
known_keys.extend(k for k in task_params.eval_kwargs if k in field_names)
# Identify all kwargs known to the current class.
init_keys = [
key
for key in dir(task_params)
if not callable(getattr(task_params, key)) and not key.startswith("_")
]
init_kwargs = {key: getattr(task_params, key) for key in init_keys}
# Move known kwargs one level up: from `eval_kwargs` to the top-level dict.
for key in known_keys:
if key in init_kwargs:
raise ValueError(
f"Parameter `{key}` is present twice, in both task parameters and "
"`eval_kwargs` dictionary. Please remove it from one of them."
)
init_kwargs[key] = init_kwargs["eval_kwargs"].pop(key)
return init_kwargs
@staticmethod
def _merge_kwargs(
kwargs_1: dict[str, Any],
kwargs_2: dict[str, Any],
) -> dict[str, Any]:
"""Merges two keyword argument dictionaries."""
if overlapping_keys := kwargs_1.keys() & kwargs_2.keys():
raise ValueError(
"The two keyword argument dictionaries contain overlapping keys: "
f"{overlapping_keys}. Please ensure that the keys in the following "
f"dictionaries are unique: `{kwargs_1.keys()}` and `{kwargs_2.keys()}`"
)
return kwargs_1 | kwargs_2
@staticmethod
def _validate_custom_kwargs(
custom_kwargs: dict[str, Any],
evaluation_fn: Callable,
evaluation_fn_name: str,
) -> None:
"""Validates the keyword arguments of the custom evaluation function."""
# Ensure that user-provided keyword arguments, which are passed into method
# `Evaluator.evaluate`, do NOT contain any reserved keys.
if reserved_keys_used := RESERVED_KEYS & custom_kwargs.keys():
raise RuntimeError(
"Reserved keys are present when calling `Evaluator.evaluate()`. "
"You are not allowed to pass the following keyword arguments into "
f"the `{evaluation_fn_name}` function: {sorted(RESERVED_KEYS)}. "
"However, you have passed the following reserved keys: "
f"{sorted(reserved_keys_used)}. These keys can (optionally) be inputs "
f"of your registered evaluation function `{evaluation_fn_name}`. "
"If you choose to use them, they will be automatically populated "
"by the Evaluator. "
f"The `{_EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME}` input "
"will provide you with an inference engine that is generated "
"according to the `EvaluationConfig.inference_engine` type that "
"you have specified in the evaluation config. "
f"Then, `{_EVALUATION_FN_TASK_PARAMS_INPUT_PARAM_NAME}`, "
f"`{_EVALUATION_FN_TASK_PARAMS_INPUT_PARAM_NAME}` will provide you "
"with the task parameters and the evaluation configuration, "
"respectively."
)
# Ensure that user-provided keyword arguments, which are passed into method
# `Evaluator.evaluate`, match the expected input parameters of the custom
# evaluation function `evaluation_fn`.
fn_signature = inspect.signature(evaluation_fn)
fn_input_params = [param.name for param in fn_signature.parameters.values()]
provided_keys: set[str] = custom_kwargs.keys() - set(RESERVED_KEYS)
expected_keys: set[str] = set(fn_input_params) - set(RESERVED_KEYS)
if unrecognized_keys := provided_keys - expected_keys:
raise RuntimeError(
"Unrecognized keyword arguments are present when calling "
"`Evaluator.evaluate()`. You have passed the following unrecognized "
f"keys: {sorted(unrecognized_keys)}. Please ensure that the provided "
"keys match the expected input parameters of the custom evaluation "
f"function `{evaluation_fn_name}`. The expected input parameters "
f"of the function are: {fn_input_params}."
)
elif missing_keys := expected_keys - provided_keys:
raise RuntimeError(
"Missing keyword arguments have been identified when calling "
"`Evaluator.evaluate()`. You have not passed the following expected "
f"keys: {missing_keys}. Please ensure that the provided keys match "
"the expected input parameters of the custom evaluation function "
f"`{evaluation_fn_name}`. The expected input parameters of the "
f"function are: {fn_input_params}."
)
def _add_reserved_keys_into_custom_kwargs(
self,
custom_kwargs: dict[str, Any],
evaluation_fn: Callable,
task_params: EvaluationTaskParams,
config: EvaluationConfig,
) -> None:
"""Adds reserved keys into the keyword arguments, if needed.
Reserved keys are keys that, if defined in the custom evaluation function
(`evaluation_fn`), are automatically populated by the Evaluator. This function
is responsible to add them into the keyword arguments.
"""
fn_signature = inspect.signature(evaluation_fn)
fn_input_params = [param.name for param in fn_signature.parameters.values()]
if _EVALUATION_FN_TASK_PARAMS_INPUT_PARAM_NAME in fn_input_params:
custom_kwargs[_EVALUATION_FN_TASK_PARAMS_INPUT_PARAM_NAME] = task_params
if _EVALUATION_FN_CONFIG_INPUT_PARAM_NAME in fn_input_params:
custom_kwargs[_EVALUATION_FN_CONFIG_INPUT_PARAM_NAME] = config
if _EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME in fn_input_params:
custom_kwargs[_EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME] = (
self._get_inference_engine(config)
)
def _add_inference_engine_if_needed(
self,
evaluation_function: Callable,
kwargs: dict[str, Any],
config: EvaluationConfig,
) -> None:
"""Adds an inference engine to the keyword arguments (`kwargs`), if needed."""
# Check if the evaluation function requires an inference engine.
fn_signature = inspect.signature(evaluation_function)
fn_input_params = [param.name for param in fn_signature.parameters.values()]
if _EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME not in fn_input_params:
return
# Ensure an inference engine is not already provided in the keyword arguments.
if kwargs.get(_EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME):
raise RuntimeError(
"The inference engine is already provided in the keyword arguments. "
f"The input param `{_EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME}` "
"is reserved for an inference engine that is generated according to "
"the evaluation config's `EvaluationConfig.inference_engine` field and "
"should not be populated by users."
)
# Add inference engine in kwargs.
kwargs[_EVALUATION_FN_INFERENCE_ENGINE_INPUT_PARAM_NAME] = (
self._get_inference_engine(config)
)
def _get_inference_engine(self, config: EvaluationConfig) -> BaseInferenceEngine:
"""Returns the inference engine based on the evaluation configuration."""
if not self._inference_engine:
self._inference_engine = build_inference_engine(
engine_type=config.inference_engine,
model_params=config.model,
remote_params=config.inference_remote_params,
generation_params=config.generation,
)
return self._inference_engine