Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse CreateEvalJobs

From Leeroopedia
Knowledge Sources
Domains LLM Evaluation, Workflow Orchestration
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for creating evaluation job executions by matching incoming events against active configurations provided by Langfuse.

Description

The createEvalJobs function is the central orchestrator for the evaluation job creation pipeline. It is invoked by three upstream queue processors (trace upsert, dataset run item upsert, and UI-triggered batch) and performs the following operations:

  1. Fetches all active EVAL-type job configurations for the event's project from PostgreSQL using Kysely
  2. Applies optional time scope filtering to restrict which configurations match
  3. Caches trace data from ClickHouse when multiple configurations need to evaluate the same trace (optimization to reduce ClickHouse query pressure)
  4. Batch-queries existing job executions across all matching configurations to enable deduplication in a single database round-trip
  5. Iterates through each configuration, applying trace existence checks, dataset item resolution, observation validation, deduplication, and sampling
  6. Creates jobExecution records with PENDING status and enqueues them to the EvalExecutionQueue with an optional delay

The function supports three event source types through a discriminated union type, each carrying different payload shapes and enforcement rules. It also includes a "no eval configs" cache optimization: when no active configurations exist for a project, this fact is cached to allow upstream queues to skip enqueuing entirely.

Usage

This function is called exclusively by the three eval job queue processors in the worker application. It should not be called directly from application code.

Code Reference

Source Location

  • Repository: langfuse
  • File: worker/src/features/evaluation/evalService.ts
  • Lines: 166-684

Signature

type CreateEvalJobsParams = {
  jobTimestamp: Date;
  enforcedJobTimeScope?: JobTimeScope;
} & (
  | {
      sourceEventType: "trace-upsert";
      event: TraceQueueEventType;
    }
  | {
      sourceEventType: "dataset-run-item-upsert";
      event: DatasetRunItemUpsertEventType;
    }
  | {
      sourceEventType: "ui-create-eval";
      event: CreateEvalQueueEventType;
    }
);

export const createEvalJobs = async ({
  event,
  sourceEventType,
  jobTimestamp,
  enforcedJobTimeScope,
}: CreateEvalJobsParams) => Promise<void>;

Import

import { createEvalJobs } from "../features/evaluation/evalService";

I/O Contract

Inputs

Name Type Required Description
event TraceQueueEventType or DatasetRunItemUpsertEventType or CreateEvalQueueEventType Yes The upstream event containing projectId, traceId, and source-specific fields (timestamp, configId, datasetItemId, observationId, etc.).
sourceEventType "trace-upsert" or "dataset-run-item-upsert" or "ui-create-eval" Yes Discriminator indicating which upstream queue produced this event. Controls time scope enforcement and loop prevention.
jobTimestamp Date Yes The timestamp when the queue job was created. Used as a fallback for trace lookups when the event payload does not include an explicit timestamp.
enforcedJobTimeScope JobTimeScope ("NEW" or "EXISTING") No When set, only job configurations whose time_scope column includes this value are considered. Trace-upsert and dataset-run-item-upsert processors set this to "NEW".

Outputs

Name Type Description
void Promise<void> No return value. Side effects include: jobExecution records created in PostgreSQL with PENDING status; jobs enqueued to EvalExecutionQueue; stale executions cancelled; no-eval-configs cache set when no configurations found.

Usage Examples

Called from Trace Upsert Processor

import { createEvalJobs } from "../features/evaluation/evalService";

export const evalJobTraceCreatorQueueProcessor = async (
  job: Job<TQueueJobTypes[QueueName.TraceUpsert]>,
) => {
  await createEvalJobs({
    sourceEventType: "trace-upsert",
    event: job.data.payload,
    jobTimestamp: job.data.timestamp,
    enforcedJobTimeScope: "NEW",
  });
  return true;
};

Called from Dataset Run Item Upsert Processor

export const evalJobDatasetCreatorQueueProcessor = async (
  job: Job<TQueueJobTypes[QueueName.DatasetRunItemUpsert]>,
) => {
  await createEvalJobs({
    sourceEventType: "dataset-run-item-upsert",
    event: job.data.payload,
    jobTimestamp: job.data.timestamp,
    enforcedJobTimeScope: "NEW",
  });
  return true;
};

Called from UI Batch Processor (No Time Scope Enforcement)

export const evalJobCreatorQueueProcessor = async (
  job: Job<TQueueJobTypes[QueueName.CreateEvalQueue]>,
) => {
  await createEvalJobs({
    sourceEventType: "ui-create-eval",
    event: job.data.payload,
    jobTimestamp: job.data.timestamp,
    // No enforcedJobTimeScope -- allows both NEW and EXISTING configs
  });
  return true;
};

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment