Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse ScheduleObservationEvals

From Leeroopedia
Revision as of 13:14, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Langfuse_Langfuse_ScheduleObservationEvals.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains LLM Evaluation, Job Orchestration
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for scheduling observation-level evaluations against completed experiment generations, provided by Langfuse.

Description

scheduleObservationEvals is a worker-side function that takes a completed observation, a list of pre-fetched evaluation configs, and a set of scheduler dependencies, then determines which configs match the observation and dispatches eval jobs accordingly.

The function is invoked from two pathways:

  1. Experiment pathway: Called by scheduleExperimentObservationEvals after an experiment LLM call completes. The observation is constructed from GenerationDetails with experiment-specific fields (experiment ID, dataset item ID, expected output, etc.).
  2. OTEL ingestion pathway: Called during the observation ingestion pipeline for externally-ingested observations that match event-targeted eval configs.

The implementation follows a three-stage process:

Stage 1 -- Config filtering: Each config is tested against the observation using evaluateFilter (which delegates to InMemoryFilterService.evaluateFilter with a field mapper for observation columns) and shouldSampleObservation (which performs random sampling at the configured rate). For experiment-targeted configs, the filter additionally requires that span_id === experiment_item_root_span_id.

Stage 2 -- S3 upload: If at least one config matches, the observation data is uploaded to S3 via schedulerDeps.uploadObservationToS3. This happens exactly once per observation regardless of how many configs match.

Stage 3 -- Job creation: For each matching config, processMatchingConfig is called in parallel via Promise.all. Each call generates a deterministic job execution ID from the config ID and observation span ID, upserts a job execution record with PENDING status, and enqueues an eval job to the LLMAsJudgeExecution queue with zero delay. Individual config processing errors are caught and logged without affecting other configs.

The ObservationEvalSchedulerDeps interface decouples the scheduler from its infrastructure dependencies (S3, database, queue), making it testable and allowing different implementations for different deployment contexts.

Usage

This function is called by the experiment processing pipeline (via scheduleExperimentObservationEvals) and by the OTEL observation ingestion pipeline. It is not called directly from the web application.

Code Reference

Source Location

  • Repository: langfuse
  • File: worker/src/features/evaluation/observationEval/scheduleObservationEvals.ts
  • Lines: 35-102

Signature

interface ScheduleObservationEvalsParams {
  observation: ObservationForEval;
  configs: ObservationEvalConfig[];
  schedulerDeps: ObservationEvalSchedulerDeps;
}

export async function scheduleObservationEvals(
  params: ScheduleObservationEvalsParams,
): Promise<void>

Import

import { scheduleObservationEvals } from "@/src/features/evaluation/observationEval/scheduleObservationEvals";

I/O Contract

Inputs

Name Type Required Description
observation ObservationForEval Yes The observation to evaluate. Contains span_id, trace_id, project_id, type, name, environment, prompt info, experiment fields, input, output, metadata, tags, usage/cost details, tool info.
configs ObservationEvalConfig[] Yes Pre-fetched evaluation configurations for the project. Each contains id, projectId, filter (FilterState), sampling (Decimal), evalTemplateId, scoreName, targetObject, variableMapping, delay.
schedulerDeps.upsertJobExecution function Yes Creates or updates a job execution record with id, projectId, jobConfigurationId, jobInputTraceId, jobInputObservationId, jobTemplateId, status.
schedulerDeps.uploadObservationToS3 function Yes Uploads observation data to S3 and returns the S3 path string.
schedulerDeps.enqueueEvalJob function Yes Enqueues an eval job with jobExecutionId, projectId, observationS3Path, delay.

Outputs

Name Type Description
(void) void The function returns nothing. All effects are side effects.

Side Effects

Effect Description
S3 upload Observation data is uploaded to S3 once if at least one config matches.
Job execution records A PENDING job execution record is created for each matching config.
Eval queue jobs An eval job is enqueued to LLMAsJudgeExecution for each matching config with zero delay.

Usage Examples

Experiment Eval Scheduling

// From scheduleExperimentObservationEvals:
const observation: ObservationForEval = {
  span_id: generationDetails.observationId,
  trace_id: traceId,
  project_id: projectId,
  type: "GENERATION",
  name: generationDetails.name || "generation",
  environment: LangfuseInternalTraceEnvironment.PromptExperiments,
  level: "DEFAULT",
  prompt_name: config.prompt?.name,
  prompt_version: config.prompt?.version,
  experiment_id: config.runId,
  experiment_name: config.experimentName,
  experiment_dataset_id: datasetItem.datasetId,
  experiment_item_id: datasetItem.id,
  experiment_item_expected_output: datasetItem.expectedOutput
    ? JSON.stringify(datasetItem.expectedOutput)
    : null,
  experiment_item_root_span_id: generationDetails.observationId,
  input: generationDetails.input,
  output: generationDetails.output,
  metadata: generationDetails.metadata,
  tags: [],
  provided_usage_details: {},
  provided_cost_details: {},
  usage_details: {},
  cost_details: {},
  tool_definitions: {},
  tool_calls: [],
  tool_call_names: [],
};

const schedulerDeps = createObservationEvalSchedulerDeps();
await scheduleObservationEvals({ observation, configs, schedulerDeps });

ObservationEvalSchedulerDeps Interface

interface ObservationEvalSchedulerDeps {
  upsertJobExecution: (params: {
    id: string;
    projectId: string;
    jobConfigurationId: string;
    jobInputTraceId: string;
    jobInputObservationId: string;
    jobTemplateId: string | null;
    status: JobExecutionStatus;
  }) => Promise<{ id: string }>;

  uploadObservationToS3: (params: {
    projectId: string;
    observationId: string;
    data: Record<string, unknown>;
  }) => Promise<string>;

  enqueueEvalJob: (params: {
    jobExecutionId: string;
    projectId: string;
    observationS3Path: string;
    delay: number;
  }) => Promise<void>;
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment