Implementation:Langfuse Langfuse ScheduleObservationEvals
| Knowledge Sources | |
|---|---|
| Domains | LLM Evaluation, Job Orchestration |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for scheduling observation-level evaluations against completed experiment generations, provided by Langfuse.
Description
scheduleObservationEvals is a worker-side function that takes a completed observation, a list of pre-fetched evaluation configs, and a set of scheduler dependencies, then determines which configs match the observation and dispatches eval jobs accordingly.
The function is invoked from two pathways:
- Experiment pathway: Called by
scheduleExperimentObservationEvalsafter an experiment LLM call completes. The observation is constructed fromGenerationDetailswith experiment-specific fields (experiment ID, dataset item ID, expected output, etc.). - OTEL ingestion pathway: Called during the observation ingestion pipeline for externally-ingested observations that match event-targeted eval configs.
The implementation follows a three-stage process:
Stage 1 -- Config filtering: Each config is tested against the observation using evaluateFilter (which delegates to InMemoryFilterService.evaluateFilter with a field mapper for observation columns) and shouldSampleObservation (which performs random sampling at the configured rate). For experiment-targeted configs, the filter additionally requires that span_id === experiment_item_root_span_id.
Stage 2 -- S3 upload: If at least one config matches, the observation data is uploaded to S3 via schedulerDeps.uploadObservationToS3. This happens exactly once per observation regardless of how many configs match.
Stage 3 -- Job creation: For each matching config, processMatchingConfig is called in parallel via Promise.all. Each call generates a deterministic job execution ID from the config ID and observation span ID, upserts a job execution record with PENDING status, and enqueues an eval job to the LLMAsJudgeExecution queue with zero delay. Individual config processing errors are caught and logged without affecting other configs.
The ObservationEvalSchedulerDeps interface decouples the scheduler from its infrastructure dependencies (S3, database, queue), making it testable and allowing different implementations for different deployment contexts.
Usage
This function is called by the experiment processing pipeline (via scheduleExperimentObservationEvals) and by the OTEL observation ingestion pipeline. It is not called directly from the web application.
Code Reference
Source Location
- Repository: langfuse
- File: worker/src/features/evaluation/observationEval/scheduleObservationEvals.ts
- Lines: 35-102
Signature
interface ScheduleObservationEvalsParams {
observation: ObservationForEval;
configs: ObservationEvalConfig[];
schedulerDeps: ObservationEvalSchedulerDeps;
}
export async function scheduleObservationEvals(
params: ScheduleObservationEvalsParams,
): Promise<void>
Import
import { scheduleObservationEvals } from "@/src/features/evaluation/observationEval/scheduleObservationEvals";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| observation | ObservationForEval | Yes | The observation to evaluate. Contains span_id, trace_id, project_id, type, name, environment, prompt info, experiment fields, input, output, metadata, tags, usage/cost details, tool info. |
| configs | ObservationEvalConfig[] | Yes | Pre-fetched evaluation configurations for the project. Each contains id, projectId, filter (FilterState), sampling (Decimal), evalTemplateId, scoreName, targetObject, variableMapping, delay. |
| schedulerDeps.upsertJobExecution | function | Yes | Creates or updates a job execution record with id, projectId, jobConfigurationId, jobInputTraceId, jobInputObservationId, jobTemplateId, status. |
| schedulerDeps.uploadObservationToS3 | function | Yes | Uploads observation data to S3 and returns the S3 path string. |
| schedulerDeps.enqueueEvalJob | function | Yes | Enqueues an eval job with jobExecutionId, projectId, observationS3Path, delay. |
Outputs
| Name | Type | Description |
|---|---|---|
| (void) | void | The function returns nothing. All effects are side effects. |
Side Effects
| Effect | Description |
|---|---|
| S3 upload | Observation data is uploaded to S3 once if at least one config matches. |
| Job execution records | A PENDING job execution record is created for each matching config.
|
| Eval queue jobs | An eval job is enqueued to LLMAsJudgeExecution for each matching config with zero delay.
|
Usage Examples
Experiment Eval Scheduling
// From scheduleExperimentObservationEvals:
const observation: ObservationForEval = {
span_id: generationDetails.observationId,
trace_id: traceId,
project_id: projectId,
type: "GENERATION",
name: generationDetails.name || "generation",
environment: LangfuseInternalTraceEnvironment.PromptExperiments,
level: "DEFAULT",
prompt_name: config.prompt?.name,
prompt_version: config.prompt?.version,
experiment_id: config.runId,
experiment_name: config.experimentName,
experiment_dataset_id: datasetItem.datasetId,
experiment_item_id: datasetItem.id,
experiment_item_expected_output: datasetItem.expectedOutput
? JSON.stringify(datasetItem.expectedOutput)
: null,
experiment_item_root_span_id: generationDetails.observationId,
input: generationDetails.input,
output: generationDetails.output,
metadata: generationDetails.metadata,
tags: [],
provided_usage_details: {},
provided_cost_details: {},
usage_details: {},
cost_details: {},
tool_definitions: {},
tool_calls: [],
tool_call_names: [],
};
const schedulerDeps = createObservationEvalSchedulerDeps();
await scheduleObservationEvals({ observation, configs, schedulerDeps });
ObservationEvalSchedulerDeps Interface
interface ObservationEvalSchedulerDeps {
upsertJobExecution: (params: {
id: string;
projectId: string;
jobConfigurationId: string;
jobInputTraceId: string;
jobInputObservationId: string;
jobTemplateId: string | null;
status: JobExecutionStatus;
}) => Promise<{ id: string }>;
uploadObservationToS3: (params: {
projectId: string;
observationId: string;
data: Record<string, unknown>;
}) => Promise<string>;
enqueueEvalJob: (params: {
jobExecutionId: string;
projectId: string;
observationS3Path: string;
delay: number;
}) => Promise<void>;
}