Create evaluation tasks that continuously or on-demand score spans in a project, or evaluate examples in a dataset using your LLM-as-judge evaluators.
Key Capabilities
- Create project-based tasks that run continuously against live spans
- Create dataset-based tasks that evaluate experiment results
- Trigger on-demand task runs with custom data windows
- Poll task runs until completion with configurable timeout
- Cancel in-progress runs
- List and filter task runs by status
tasks operations are currently in ALPHA. A one-time warning is emitted on first use.
List Tasks
List tasks you have access to, with optional filtering by space, project, dataset, or type.
resp = client.tasks.list(
space="your-space-name-or-id", # optional
limit=50,
)
for task in resp.tasks:
print(task.id, task.name)
Filter by task type:
resp = client.tasks.list(
space="your-space-name-or-id",
task_type="template_evaluation",
)
Valid values for task_type are "template_evaluation" and "code_evaluation".
For details on pagination, field introspection, and data conversion (to dict/JSON/DataFrame), see Response Objects.
Get a Task
Retrieve a task by name or ID. When using a name, provide space to disambiguate.
task = client.tasks.get(
task="your-task-name-or-id",
space="your-space-name-or-id", # required when using a name
)
print(task.id, task.name)
Create a Task
Create a new evaluation task. Tasks can target either a project (live spans) or a dataset (experiment results).
Project-Based Task
A project-based task continuously evaluates incoming spans. Set is_continuous=True to run the task on every new span, or False to run it only on demand.
from arize._generated.api_client.models import TasksCreateRequestEvaluatorsInner
task = client.tasks.create(
name="Relevance Monitor",
task_type="template_evaluation",
project="your-project-name-or-id",
evaluators=[
TasksCreateRequestEvaluatorsInner(
evaluator_id="your-evaluator-id",
),
],
is_continuous=True,
sampling_rate=0.1, # Evaluate 10% of spans
)
print(task.id)
Dataset-Based Task
A dataset-based task evaluates examples from one or more experiments. At least one experiment_ids entry is required.
task = client.tasks.create(
name="Experiment Evaluation",
task_type="template_evaluation",
dataset="your-dataset-name-or-id",
experiment_ids=["experiment-id-1", "experiment-id-2"],
evaluators=[
TasksCreateRequestEvaluatorsInner(
evaluator_id="your-evaluator-id",
),
],
is_continuous=False,
)
print(task.id)
Column Mappings and Filters
Each evaluator in the task can have its own column mappings (to map template variables to span attribute names) and a per-evaluator query filter.
task = client.tasks.create(
name="Custom Relevance",
task_type="template_evaluation",
project="your-project-name-or-id",
evaluators=[
TasksCreateRequestEvaluatorsInner(
evaluator_id="your-evaluator-id",
column_mappings={"user_query": "input.value"},
query_filter="status_code = 'OK'",
),
],
query_filter="latency_ms < 5000", # Task-level filter (AND-ed with evaluator filter)
is_continuous=True,
)
Parameter reference:
| Parameter | Type | Description |
|---|
name | str | Task name. Must be unique within the space. |
task_type | str | "template_evaluation" or "code_evaluation". |
evaluators | list | List of evaluators to attach. At least one required. |
project | str | Target project name or ID. Required when dataset is not provided. |
dataset | str | Target dataset name or ID. Required when project is not provided. |
space | str | Space name or ID used to disambiguate name-based resolution for project and dataset. |
experiment_ids | list[str] | Required (at least one) when dataset is provided. |
sampling_rate | float | Fraction of spans to evaluate (0–1). Project-based tasks only. |
is_continuous | bool | True to run on every new span; False for on-demand only. |
query_filter | str | Task-level SQL-style filter applied to all evaluators. |
Task Runs
Trigger a Run
Trigger an on-demand run for a task. The run starts in "pending" status.
from datetime import datetime
run = client.tasks.trigger_run(
task="your-task-name-or-id",
data_start_time=datetime(2024, 1, 1),
data_end_time=datetime(2024, 2, 1),
)
print(run.id, run.status) # e.g. "run-abc123", "pending"
Parameters:
| Parameter | Type | Default | Description |
|---|
task | str | required | Task name or ID to trigger. |
space | str | None | Space name or ID used to disambiguate the task lookup. Recommended when resolving by name. |
data_start_time | datetime | None | Start of data window to evaluate. |
data_end_time | datetime | now | End of data window. Defaults to the current time. |
max_spans | int | 10 000 | Maximum number of spans to process. |
override_evaluations | bool | False | Re-evaluate data that already has labels. |
experiment_ids | list[str] | None | Experiment IDs to run against (dataset-based tasks only). |
List Runs
List runs for a task with optional status filtering.
resp = client.tasks.list_runs(
task="your-task-name-or-id",
limit=20,
)
for run in resp.task_runs:
print(run.id, run.status)
Filter to only completed runs:
resp = client.tasks.list_runs(
task="your-task-name-or-id",
status="completed",
)
Valid status values: "pending", "running", "completed", "failed", "cancelled".
Get a Run
Retrieve a specific run by its ID.
run = client.tasks.get_run(run_id="your-run-id")
print(run.id, run.status)
Cancel a Run
Cancel a run that is currently "pending" or "running".
run = client.tasks.cancel_run(run_id="your-run-id")
print(run.status) # "cancelled"
Wait for a Run
Poll a run until it reaches a terminal state ("completed", "failed", or "cancelled").
run = client.tasks.wait_for_run(
run_id="your-run-id",
poll_interval=5, # Check every 5 seconds (default)
timeout=600, # Give up after 10 minutes (default)
)
print(run.status) # "completed", "failed", or "cancelled"
Raises TimeoutError if the run does not complete within timeout seconds.
End-to-End: Trigger and Wait
# Trigger an on-demand run
run = client.tasks.trigger_run(task="your-task-name-or-id")
# Block until the run finishes
run = client.tasks.wait_for_run(run_id=run.id)
if run.status == "completed":
print("Task run completed successfully")
elif run.status == "failed":
print("Task run failed")
Learn more: Online Evaluations Documentation