Core Functions
Primary functions for running experiments and managing execution.
evaluate()
- evaluate(function, *, dataset=None, dataset_id=None, evaluators=None, instrumentors=None, api_key=None, server_url=None, project=None, name=None, max_workers=10, aggregate_function='average', verbose=False, print_results=True)
Run an experiment by executing a function against a dataset and evaluating outputs.
This is the main entry point for the experiments framework. It handles:
Function execution with tracer integration
Evaluator orchestration (sync and async)
Session/event linking
Results aggregation via backend
- Parameters:
function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to test. Should accept
Dict[str, Any](datapoint) and returnDict[str, Any](outputs).dataset (Optional[List[Dict[str, Any]]]) – List of test cases with
inputsand optionalground_truth. Mutually exclusive withdataset_id.dataset_id (Optional[str]) – ID of HoneyHive-managed dataset. Mutually exclusive with
dataset.evaluators (Optional[List[Callable]]) – List of evaluator functions decorated with
@evaluatoror@aevaluator.api_key (Optional[str]) – HoneyHive API key. Falls back to
HH_API_KEYenvironment variable.project (Optional[str]) – HoneyHive project name. Falls back to
HH_PROJECTenvironment variable.name (Optional[str]) – Human-readable name for this experiment run.
instrumentors (Optional[List[Callable[[], Any]]]) – List of instrumentor factory functions. Each factory returns a new instrumentor instance. Example:
[lambda: OpenAIInstrumentor()]server_url (Optional[str]) – HoneyHive server URL. Falls back to
HH_API_URLenvironment variable.max_workers (int) – Maximum number of concurrent workers for parallel execution. Default: 10.
print_results (bool) – Print a formatted results table after evaluation. Default: True.
aggregate_function (str) – Aggregation method for metrics (“average”, “sum”, “min”, “max”).
verbose (bool) – Enable detailed logging.
- Returns:
Experiment result summary with aggregated metrics.
- Return type:
- Raises:
ValueError – If neither
datasetnordataset_idprovided, or if both provided.
Basic Usage
from honeyhive.experiments import evaluate, evaluator @evaluator def accuracy_evaluator(outputs, inputs, ground_truth): return {"score": 1.0 if outputs == ground_truth else 0.0} def my_llm_function(datapoint): inputs = datapoint["inputs"] # Your LLM logic here return {"answer": process(inputs["query"])} result = evaluate( function=my_llm_function, dataset=[ {"inputs": {"query": "Q1"}, "ground_truth": {"answer": "A1"}}, {"inputs": {"query": "Q2"}, "ground_truth": {"answer": "A2"}}, ], evaluators=[accuracy_evaluator], api_key="your-api-key", project="your-project", name="accuracy-test-v1" ) print(f"Success: {result.success}") print(f"Passed: {result.passed} / {result.passed + result.failed}") print(f"Avg accuracy: {result.metrics.get_metric('accuracy_evaluator')}")
External Dataset (Client-Side Data)
# SDK auto-generates EXT- prefixed IDs result = evaluate( function=my_function, dataset=[ {"inputs": {"x": 1}, "ground_truth": {"y": 2}}, {"inputs": {"x": 2}, "ground_truth": {"y": 4}}, ], evaluators=[my_evaluator], api_key="key", project="project" )
Managed Dataset (HoneyHive-Stored)
# Use existing dataset by ID result = evaluate( function=my_function, dataset_id="dataset-abc-123", # Pre-created in HoneyHive evaluators=[my_evaluator], api_key="key", project="project" )
Multiple Evaluators
@evaluator def accuracy(outputs, inputs, ground_truth): return {"score": calculate_accuracy(outputs, ground_truth)} @evaluator def relevance(outputs, inputs, ground_truth): return {"score": calculate_relevance(outputs, inputs)} @aevaluator async def external_check(outputs, inputs, ground_truth): result = await external_api.validate(outputs) return {"score": result.score} result = evaluate( function=my_function, dataset=test_data, evaluators=[accuracy, relevance, external_check], api_key="key", project="project", max_workers=4 # Parallel execution )
Accessing Results
result = evaluate(...) # Overall status print(f"Run ID: {result.run_id}") print(f"Status: {result.status}") print(f"Success: {result.success}") # Aggregated metrics accuracy_score = result.metrics.get_metric("accuracy") all_metrics = result.metrics.get_all_metrics() # Individual datapoints for datapoint in result.datapoints: print(f"Datapoint: {datapoint}")
run_experiment()
- run_experiment(function, dataset, datapoint_ids, *, server_url=None, experiment_context, api_key=None, max_workers=10, verbose=False, instrumentors=None)
Low-level function to execute a function against a dataset with tracer integration.
Warning
This is a low-level API. Most users should use
evaluate()instead, which provides a higher-level interface with evaluator support.- Parameters:
function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to execute for each datapoint.
dataset (List[Dict[str, Any]]) – List of datapoints to process.
datapoint_ids (List[str]) – List of datapoint IDs (must match dataset length).
experiment_context (ExperimentContext) – Context with run_id, dataset_id, project, source.
server_url (Optional[str]) – HoneyHive server URL. Falls back to
HH_API_URLenv var.api_key (Optional[str]) – HoneyHive API key. Falls back to
HH_API_KEYenv var.max_workers (int) – Maximum concurrent workers. Default: 10.
verbose (bool) – Enable detailed logging.
instrumentors (Optional[List[Callable[[], Any]]]) – List of instrumentor factory functions per datapoint.
- Returns:
List of execution results with outputs, errors, and session IDs.
- Return type:
List[Dict[str, Any]]
Usage Example
from honeyhive.experiments import run_experiment, ExperimentContext context = ExperimentContext( run_id="run-123", dataset_id="dataset-456", project="my-project", source="test" ) results = run_experiment( function=my_function, dataset=test_data, datapoint_ids=["dp-1", "dp-2", "dp-3"], experiment_context=context, api_key="key", max_workers=2 ) for result in results: print(f"Datapoint: {result['datapoint_id']}") print(f"Status: {result['status']}") print(f"Outputs: {result['outputs']}") if result['error']: print(f"Error: {result['error']}")
ExperimentContext
- class ExperimentContext
Context object storing experiment metadata for tracer integration.
- Parameters:
run_id (str) – Unique experiment run identifier.
dataset_id (str) – Dataset identifier (may be EXT- prefixed for external datasets).
project (str) – HoneyHive project name.
run_name (Optional[str]) – Optional human-readable name for the run (used for session naming).
source (str) – Source identifier. Default:
"evaluation".metadata (Optional[Dict[str, Any]]) – Additional metadata dictionary.
Methods
- to_tracer_config(datapoint_id)
Convert context to tracer configuration dictionary for a specific datapoint.
Usage Example
from honeyhive.experiments import ExperimentContext context = ExperimentContext( run_id="run-abc-123", dataset_id="EXT-dataset-xyz", project="my-project", source="ci-pipeline" ) # Convert to tracer config (requires a datapoint_id) tracer_config = context.to_tracer_config("dp-1") # Use with HoneyHiveTracer from honeyhive import HoneyHiveTracer tracer = HoneyHiveTracer(**tracer_config, api_key="key")
Best Practices
1. Function Signatures
Your function should accept a datapoint dict and return outputs dict:
def my_function(datapoint: Dict[str, Any]) -> Dict[str, Any]:
Args:
datapoint: Contains 'inputs' and optionally 'ground_truth'
Returns:
Dict with your outputs (e.g., {"answer": "...", "confidence": 0.9})
inputs = datapoint["inputs"]
# Process inputs
return {"answer": process(inputs)}
2. Error Handling
Let exceptions bubble up - evaluate() catches and logs them:
def my_function(datapoint):
try:
result = risky_operation(datapoint["inputs"])
return {"result": result}
except SpecificError as e:
# Log but don't suppress - let evaluate() handle it
logger.warning(f"Operation failed: {e}")
raise
3. Parallel Execution
Use max_workers for I/O-bound workloads:
# Good for API calls
result = evaluate(
function=api_heavy_function,
dataset=large_dataset,
evaluators=[...],
max_workers=10, # High concurrency for I/O
api_key="key",
project="project"
)
# For CPU-bound work, keep lower
result = evaluate(
function=cpu_intensive_function,
dataset=dataset,
max_workers=2, # Lower for CPU work
api_key="key",
project="project"
)
4. Dataset Size Management
For large datasets, use batching:
def run_large_experiment(full_dataset, batch_size=100):
"""Process large dataset in batches."""
results = []
for i in range(0, len(full_dataset), batch_size):
batch = full_dataset[i:i+batch_size]
result = evaluate(
function=my_function,
dataset=batch,
evaluators=[my_evaluator],
name=f"experiment-batch-{i//batch_size}",
api_key="key",
project="project"
)
results.append(result)
return results
See Also
Evaluators - Define custom evaluators
Results Retrieval - Retrieve and compare results
Data Models - Result data models