Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse DatasetRunItemUpsertQueue

From Leeroopedia
Knowledge Sources
Domains Data Pipeline, Eventual Consistency
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for enqueueing delayed dataset run item upsert operations via a BullMQ queue, provided by Langfuse.

Description

DatasetRunItemUpsertQueue is a singleton BullMQ queue class that manages the deferred upsert of dataset run item records. It is part of Langfuse's shared server package and is used by both the experiment processing pipeline and the general dataset run item ingestion flow.

The class follows the singleton pattern: getInstance() returns a cached queue instance, creating it on first call. If Redis is not available, it returns null, and callers must handle this gracefully. The queue is created with a dedicated Redis connection (using createNewRedisInstance with offline queue disabled) and a namespace prefix derived from the queue name.

The key design decision is the 30-second initial delay configured in defaultJobOptions.delay. Every job added to this queue waits 30 seconds before becoming eligible for processing. This delay exists specifically to give LLM-as-judge evaluation jobs time to complete and write their scores before the run item is finalized and becomes visible in aggregation queries.

The queue's retry configuration provides resilience:

  • 5 attempts: Enough to survive brief database outages or connection resets.
  • Exponential backoff starting at 5 seconds: The retry intervals are approximately 5s, 10s, 20s, 40s, 80s for the five attempts.
  • removeOnComplete: true: Successfully processed jobs are immediately removed to prevent queue bloat.
  • removeOnFail: 10,000: Up to 10,000 failed jobs are retained for debugging; older failures are discarded.

An error handler is attached to the queue instance to log any BullMQ-level errors.

Usage

The queue is used in two places in the experiment pipeline:

  1. In processItem (experimentServiceClickhouse.ts), after the LLM call and eval scheduling, a job is added to this queue with the dataset item ID, trace ID, and project ID.
  2. The worker that consumes this queue performs the actual upsert and may trigger additional downstream processing.

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/redis/datasetRunItemUpsert.ts
  • Lines: 10-52

Signature

export class DatasetRunItemUpsertQueue {
  private static instance: Queue<
    TQueueJobTypes[QueueName.DatasetRunItemUpsert]
  > | null = null;

  public static getInstance(): Queue<
    TQueueJobTypes[QueueName.DatasetRunItemUpsert]
  > | null;
}

Import

import { DatasetRunItemUpsertQueue } from "@langfuse/shared/src/server/redis/datasetRunItemUpsert";
// Or via the shared server barrel export:
import { DatasetRunItemUpsertQueue } from "@langfuse/shared/src/server";

I/O Contract

Inputs (Queue Job Payload)

Name Type Required Description
payload.projectId string Yes The project ID for the dataset run item.
payload.datasetItemId string Yes The ID of the dataset item being processed.
payload.datasetItemValidFrom Date Yes The version timestamp of the dataset item.
payload.traceId string Yes The trace ID generated during experiment LLM execution.
id string Yes A unique job ID (random UUID) for deduplication.
timestamp Date Yes The timestamp when the job was created.
name string (QueueJobs.DatasetRunItemUpsert) Yes The job name constant.

Outputs

Name Type Description
Queue instance Queue or null Returns the BullMQ Queue instance, or null if Redis is unavailable.

Queue Configuration

Option Value Description
delay 30,000 ms Initial delay before the job becomes eligible for processing.
attempts 5 Maximum number of processing attempts.
backoff.type "exponential" Backoff strategy for retries.
backoff.delay 5,000 ms Base delay for exponential backoff (5s, 10s, 20s, 40s, 80s).
removeOnComplete true Remove job from queue after successful processing.
removeOnFail 10,000 Retain up to 10,000 failed jobs for debugging.

Usage Examples

Enqueueing from Experiment Processing

// From processItem in experimentServiceClickhouse.ts:
if (redis) {
  const queue = DatasetRunItemUpsertQueue.getInstance();
  if (queue) {
    await queue.add(QueueJobs.DatasetRunItemUpsert, {
      payload: {
        projectId,
        datasetItemId: datasetItem.id,
        datasetItemValidFrom: datasetItem.validFrom,
        traceId: newTraceId,
      },
      id: randomUUID(),
      timestamp: new Date(),
      name: QueueJobs.DatasetRunItemUpsert as const,
    });
  }
}

Checking Queue Availability

const queue = DatasetRunItemUpsertQueue.getInstance();
if (!queue) {
  // Redis not available; skip delayed upsert.
  // The run item was already created via processEventBatch
  // and will be visible without the delay optimization.
  logger.warn("DatasetRunItemUpsertQueue not available, skipping delayed upsert");
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment