Implementation:Langfuse Langfuse CreateEvalQueue
| Knowledge Sources | |
|---|---|
| Domains | Event-Driven Architecture, Job Queue Management |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for enqueuing evaluation job creation events via a BullMQ singleton queue provided by Langfuse.
Description
CreateEvalQueue is a singleton class that wraps a BullMQ Queue instance specifically typed for evaluation job creation events. It provides a single static method, getInstance(), which lazily initializes a Redis-backed BullMQ queue on first access and returns the same instance for all subsequent calls.
The queue is configured with:
- 5 retry attempts with exponential backoff starting at 5 seconds
- removeOnComplete: 100 to retain the last 100 completed jobs for debugging
- removeOnFail: 100,000 to retain a large number of failed jobs for investigation
- Offline queue disabled on the Redis connection to fail fast when Redis is unavailable
The queue receives events from three upstream processors:
- evalJobTraceCreatorQueueProcessor handles TraceUpsert events and calls createEvalJobs with enforcedJobTimeScope "NEW"
- evalJobDatasetCreatorQueueProcessor handles DatasetRunItemUpsert events and calls createEvalJobs with enforcedJobTimeScope "NEW"
- evalJobCreatorQueueProcessor handles UI-triggered CreateEvalQueue events and calls createEvalJobs without an enforced time scope
Each processor wraps the createEvalJobs function with error handling and logging. The dataset processor also includes special handling for ObservationNotFoundError, which triggers a manual retry mechanism separate from BullMQ's built-in retry to handle timing issues when observation data has not yet been fully replicated.
Usage
Import and use this queue when you need to enqueue evaluation job creation events from any part of the system, such as trace ingestion handlers, dataset run item processors, or the batch action system.
Code Reference
Source Location
- Repository: langfuse
- File: packages/shared/src/server/redis/createEvalQueue.ts
- Lines: 10-50
Signature
export class CreateEvalQueue {
private static instance: Queue<
TQueueJobTypes[QueueName.CreateEvalQueue]
> | null = null;
public static getInstance(): Queue<
TQueueJobTypes[QueueName.CreateEvalQueue]
> | null;
}
Import
import { CreateEvalQueue } from "@langfuse/shared/src/server/redis/createEvalQueue";
const queue = CreateEvalQueue.getInstance();
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| (none) | (none) | N/A | getInstance() takes no parameters. The queue receives typed BullMQ job data conforming to TQueueJobTypes[QueueName.CreateEvalQueue]. |
Queue Job Payload (added via queue.add()):
| Name | Type | Required | Description |
|---|---|---|---|
| name | QueueJobs.EvaluationExecution | Yes | The job name constant identifying this as an eval creation job. |
| id | string | Yes | Unique job identifier (UUID). |
| timestamp | Date | Yes | When the job was enqueued. |
| payload.projectId | string | Yes | The project to create eval jobs for. |
| payload.traceId | string | Yes | The trace ID that triggered this evaluation. |
| payload.configId | string | No | Optional specific job configuration ID to target (used by batch actions). |
Outputs
| Name | Type | Description |
|---|---|---|
| Queue or null | Queue<TQueueJobTypes[QueueName.CreateEvalQueue]> or null | A BullMQ Queue instance if Redis is available, or null if Redis connection failed. Callers must null-check before adding jobs. |
Usage Examples
Enqueuing an Eval Job Creation Event
import { CreateEvalQueue } from "@langfuse/shared/src/server/redis/createEvalQueue";
import { QueueName, QueueJobs } from "@langfuse/shared/src/server";
import { v4 as uuidv4 } from "uuid";
const queue = CreateEvalQueue.getInstance();
if (queue) {
await queue.add(
QueueName.CreateEvalQueue,
{
name: QueueJobs.EvaluationExecution,
id: uuidv4(),
timestamp: new Date(),
payload: {
projectId: "proj-123",
traceId: "trace-456",
configId: "config-789", // optional: target specific evaluator
},
},
{
delay: 10000, // optional delay in milliseconds
},
);
}
Processor Wiring (worker side)
import { Job } from "bullmq";
import { QueueName, TQueueJobTypes } from "@langfuse/shared/src/server";
import { createEvalJobs } from "../features/evaluation/evalService";
// TraceUpsert processor - enforces NEW time scope
export const evalJobTraceCreatorQueueProcessor = async (
job: Job<TQueueJobTypes[QueueName.TraceUpsert]>,
) => {
await createEvalJobs({
sourceEventType: "trace-upsert",
event: job.data.payload,
jobTimestamp: job.data.timestamp,
enforcedJobTimeScope: "NEW",
});
return true;
};
// UI-triggered batch processor - no enforced time scope
export const evalJobCreatorQueueProcessor = async (
job: Job<TQueueJobTypes[QueueName.CreateEvalQueue]>,
) => {
await createEvalJobs({
sourceEventType: "ui-create-eval",
event: job.data.payload,
jobTimestamp: job.data.timestamp,
});
return true;
};