Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse CreateEvalQueue

From Leeroopedia
Knowledge Sources
Domains Event-Driven Architecture, Job Queue Management
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for enqueuing evaluation job creation events via a BullMQ singleton queue provided by Langfuse.

Description

CreateEvalQueue is a singleton class that wraps a BullMQ Queue instance specifically typed for evaluation job creation events. It provides a single static method, getInstance(), which lazily initializes a Redis-backed BullMQ queue on first access and returns the same instance for all subsequent calls.

The queue is configured with:

  • 5 retry attempts with exponential backoff starting at 5 seconds
  • removeOnComplete: 100 to retain the last 100 completed jobs for debugging
  • removeOnFail: 100,000 to retain a large number of failed jobs for investigation
  • Offline queue disabled on the Redis connection to fail fast when Redis is unavailable

The queue receives events from three upstream processors:

  • evalJobTraceCreatorQueueProcessor handles TraceUpsert events and calls createEvalJobs with enforcedJobTimeScope "NEW"
  • evalJobDatasetCreatorQueueProcessor handles DatasetRunItemUpsert events and calls createEvalJobs with enforcedJobTimeScope "NEW"
  • evalJobCreatorQueueProcessor handles UI-triggered CreateEvalQueue events and calls createEvalJobs without an enforced time scope

Each processor wraps the createEvalJobs function with error handling and logging. The dataset processor also includes special handling for ObservationNotFoundError, which triggers a manual retry mechanism separate from BullMQ's built-in retry to handle timing issues when observation data has not yet been fully replicated.

Usage

Import and use this queue when you need to enqueue evaluation job creation events from any part of the system, such as trace ingestion handlers, dataset run item processors, or the batch action system.

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/redis/createEvalQueue.ts
  • Lines: 10-50

Signature

export class CreateEvalQueue {
  private static instance: Queue<
    TQueueJobTypes[QueueName.CreateEvalQueue]
  > | null = null;

  public static getInstance(): Queue<
    TQueueJobTypes[QueueName.CreateEvalQueue]
  > | null;
}

Import

import { CreateEvalQueue } from "@langfuse/shared/src/server/redis/createEvalQueue";

const queue = CreateEvalQueue.getInstance();

I/O Contract

Inputs

Name Type Required Description
(none) (none) N/A getInstance() takes no parameters. The queue receives typed BullMQ job data conforming to TQueueJobTypes[QueueName.CreateEvalQueue].

Queue Job Payload (added via queue.add()):

Name Type Required Description
name QueueJobs.EvaluationExecution Yes The job name constant identifying this as an eval creation job.
id string Yes Unique job identifier (UUID).
timestamp Date Yes When the job was enqueued.
payload.projectId string Yes The project to create eval jobs for.
payload.traceId string Yes The trace ID that triggered this evaluation.
payload.configId string No Optional specific job configuration ID to target (used by batch actions).

Outputs

Name Type Description
Queue or null Queue<TQueueJobTypes[QueueName.CreateEvalQueue]> or null A BullMQ Queue instance if Redis is available, or null if Redis connection failed. Callers must null-check before adding jobs.

Usage Examples

Enqueuing an Eval Job Creation Event

import { CreateEvalQueue } from "@langfuse/shared/src/server/redis/createEvalQueue";
import { QueueName, QueueJobs } from "@langfuse/shared/src/server";
import { v4 as uuidv4 } from "uuid";

const queue = CreateEvalQueue.getInstance();
if (queue) {
  await queue.add(
    QueueName.CreateEvalQueue,
    {
      name: QueueJobs.EvaluationExecution,
      id: uuidv4(),
      timestamp: new Date(),
      payload: {
        projectId: "proj-123",
        traceId: "trace-456",
        configId: "config-789", // optional: target specific evaluator
      },
    },
    {
      delay: 10000, // optional delay in milliseconds
    },
  );
}

Processor Wiring (worker side)

import { Job } from "bullmq";
import { QueueName, TQueueJobTypes } from "@langfuse/shared/src/server";
import { createEvalJobs } from "../features/evaluation/evalService";

// TraceUpsert processor - enforces NEW time scope
export const evalJobTraceCreatorQueueProcessor = async (
  job: Job<TQueueJobTypes[QueueName.TraceUpsert]>,
) => {
  await createEvalJobs({
    sourceEventType: "trace-upsert",
    event: job.data.payload,
    jobTimestamp: job.data.timestamp,
    enforcedJobTimeScope: "NEW",
  });
  return true;
};

// UI-triggered batch processor - no enforced time scope
export const evalJobCreatorQueueProcessor = async (
  job: Job<TQueueJobTypes[QueueName.CreateEvalQueue]>,
) => {
  await createEvalJobs({
    sourceEventType: "ui-create-eval",
    event: job.data.payload,
    jobTimestamp: job.data.timestamp,
  });
  return true;
};

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment