Implementation:Langfuse Langfuse CreateEvalJobs
| Knowledge Sources | |
|---|---|
| Domains | LLM Evaluation, Workflow Orchestration |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for creating evaluation job executions by matching incoming events against active configurations provided by Langfuse.
Description
The createEvalJobs function is the central orchestrator for the evaluation job creation pipeline. It is invoked by three upstream queue processors (trace upsert, dataset run item upsert, and UI-triggered batch) and performs the following operations:
- Fetches all active EVAL-type job configurations for the event's project from PostgreSQL using Kysely
- Applies optional time scope filtering to restrict which configurations match
- Caches trace data from ClickHouse when multiple configurations need to evaluate the same trace (optimization to reduce ClickHouse query pressure)
- Batch-queries existing job executions across all matching configurations to enable deduplication in a single database round-trip
- Iterates through each configuration, applying trace existence checks, dataset item resolution, observation validation, deduplication, and sampling
- Creates jobExecution records with PENDING status and enqueues them to the EvalExecutionQueue with an optional delay
The function supports three event source types through a discriminated union type, each carrying different payload shapes and enforcement rules. It also includes a "no eval configs" cache optimization: when no active configurations exist for a project, this fact is cached to allow upstream queues to skip enqueuing entirely.
Usage
This function is called exclusively by the three eval job queue processors in the worker application. It should not be called directly from application code.
Code Reference
Source Location
- Repository: langfuse
- File: worker/src/features/evaluation/evalService.ts
- Lines: 166-684
Signature
type CreateEvalJobsParams = {
jobTimestamp: Date;
enforcedJobTimeScope?: JobTimeScope;
} & (
| {
sourceEventType: "trace-upsert";
event: TraceQueueEventType;
}
| {
sourceEventType: "dataset-run-item-upsert";
event: DatasetRunItemUpsertEventType;
}
| {
sourceEventType: "ui-create-eval";
event: CreateEvalQueueEventType;
}
);
export const createEvalJobs = async ({
event,
sourceEventType,
jobTimestamp,
enforcedJobTimeScope,
}: CreateEvalJobsParams) => Promise<void>;
Import
import { createEvalJobs } from "../features/evaluation/evalService";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| event | TraceQueueEventType or DatasetRunItemUpsertEventType or CreateEvalQueueEventType | Yes | The upstream event containing projectId, traceId, and source-specific fields (timestamp, configId, datasetItemId, observationId, etc.). |
| sourceEventType | "trace-upsert" or "dataset-run-item-upsert" or "ui-create-eval" | Yes | Discriminator indicating which upstream queue produced this event. Controls time scope enforcement and loop prevention. |
| jobTimestamp | Date | Yes | The timestamp when the queue job was created. Used as a fallback for trace lookups when the event payload does not include an explicit timestamp. |
| enforcedJobTimeScope | JobTimeScope ("NEW" or "EXISTING") | No | When set, only job configurations whose time_scope column includes this value are considered. Trace-upsert and dataset-run-item-upsert processors set this to "NEW". |
Outputs
| Name | Type | Description |
|---|---|---|
| void | Promise<void> | No return value. Side effects include: jobExecution records created in PostgreSQL with PENDING status; jobs enqueued to EvalExecutionQueue; stale executions cancelled; no-eval-configs cache set when no configurations found. |
Usage Examples
Called from Trace Upsert Processor
import { createEvalJobs } from "../features/evaluation/evalService";
export const evalJobTraceCreatorQueueProcessor = async (
job: Job<TQueueJobTypes[QueueName.TraceUpsert]>,
) => {
await createEvalJobs({
sourceEventType: "trace-upsert",
event: job.data.payload,
jobTimestamp: job.data.timestamp,
enforcedJobTimeScope: "NEW",
});
return true;
};
Called from Dataset Run Item Upsert Processor
export const evalJobDatasetCreatorQueueProcessor = async (
job: Job<TQueueJobTypes[QueueName.DatasetRunItemUpsert]>,
) => {
await createEvalJobs({
sourceEventType: "dataset-run-item-upsert",
event: job.data.payload,
jobTimestamp: job.data.timestamp,
enforcedJobTimeScope: "NEW",
});
return true;
};
Called from UI Batch Processor (No Time Scope Enforcement)
export const evalJobCreatorQueueProcessor = async (
job: Job<TQueueJobTypes[QueueName.CreateEvalQueue]>,
) => {
await createEvalJobs({
sourceEventType: "ui-create-eval",
event: job.data.payload,
jobTimestamp: job.data.timestamp,
// No enforcedJobTimeScope -- allows both NEW and EXISTING configs
});
return true;
};