Implementation:Langfuse Langfuse DatasetRunItemUpsertQueue
| Knowledge Sources | |
|---|---|
| Domains | Data Pipeline, Eventual Consistency |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for enqueueing delayed dataset run item upsert operations via a BullMQ queue, provided by Langfuse.
Description
DatasetRunItemUpsertQueue is a singleton BullMQ queue class that manages the deferred upsert of dataset run item records. It is part of Langfuse's shared server package and is used by both the experiment processing pipeline and the general dataset run item ingestion flow.
The class follows the singleton pattern: getInstance() returns a cached queue instance, creating it on first call. If Redis is not available, it returns null, and callers must handle this gracefully. The queue is created with a dedicated Redis connection (using createNewRedisInstance with offline queue disabled) and a namespace prefix derived from the queue name.
The key design decision is the 30-second initial delay configured in defaultJobOptions.delay. Every job added to this queue waits 30 seconds before becoming eligible for processing. This delay exists specifically to give LLM-as-judge evaluation jobs time to complete and write their scores before the run item is finalized and becomes visible in aggregation queries.
The queue's retry configuration provides resilience:
- 5 attempts: Enough to survive brief database outages or connection resets.
- Exponential backoff starting at 5 seconds: The retry intervals are approximately 5s, 10s, 20s, 40s, 80s for the five attempts.
- removeOnComplete: true: Successfully processed jobs are immediately removed to prevent queue bloat.
- removeOnFail: 10,000: Up to 10,000 failed jobs are retained for debugging; older failures are discarded.
An error handler is attached to the queue instance to log any BullMQ-level errors.
Usage
The queue is used in two places in the experiment pipeline:
- In
processItem(experimentServiceClickhouse.ts), after the LLM call and eval scheduling, a job is added to this queue with the dataset item ID, trace ID, and project ID. - The worker that consumes this queue performs the actual upsert and may trigger additional downstream processing.
Code Reference
Source Location
- Repository: langfuse
- File: packages/shared/src/server/redis/datasetRunItemUpsert.ts
- Lines: 10-52
Signature
export class DatasetRunItemUpsertQueue {
private static instance: Queue<
TQueueJobTypes[QueueName.DatasetRunItemUpsert]
> | null = null;
public static getInstance(): Queue<
TQueueJobTypes[QueueName.DatasetRunItemUpsert]
> | null;
}
Import
import { DatasetRunItemUpsertQueue } from "@langfuse/shared/src/server/redis/datasetRunItemUpsert";
// Or via the shared server barrel export:
import { DatasetRunItemUpsertQueue } from "@langfuse/shared/src/server";
I/O Contract
Inputs (Queue Job Payload)
| Name | Type | Required | Description |
|---|---|---|---|
| payload.projectId | string | Yes | The project ID for the dataset run item. |
| payload.datasetItemId | string | Yes | The ID of the dataset item being processed. |
| payload.datasetItemValidFrom | Date | Yes | The version timestamp of the dataset item. |
| payload.traceId | string | Yes | The trace ID generated during experiment LLM execution. |
| id | string | Yes | A unique job ID (random UUID) for deduplication. |
| timestamp | Date | Yes | The timestamp when the job was created. |
| name | string (QueueJobs.DatasetRunItemUpsert) | Yes | The job name constant. |
Outputs
| Name | Type | Description |
|---|---|---|
| Queue instance | Queue or null | Returns the BullMQ Queue instance, or null if Redis is unavailable.
|
Queue Configuration
| Option | Value | Description |
|---|---|---|
| delay | 30,000 ms | Initial delay before the job becomes eligible for processing. |
| attempts | 5 | Maximum number of processing attempts. |
| backoff.type | "exponential" | Backoff strategy for retries. |
| backoff.delay | 5,000 ms | Base delay for exponential backoff (5s, 10s, 20s, 40s, 80s). |
| removeOnComplete | true | Remove job from queue after successful processing. |
| removeOnFail | 10,000 | Retain up to 10,000 failed jobs for debugging. |
Usage Examples
Enqueueing from Experiment Processing
// From processItem in experimentServiceClickhouse.ts:
if (redis) {
const queue = DatasetRunItemUpsertQueue.getInstance();
if (queue) {
await queue.add(QueueJobs.DatasetRunItemUpsert, {
payload: {
projectId,
datasetItemId: datasetItem.id,
datasetItemValidFrom: datasetItem.validFrom,
traceId: newTraceId,
},
id: randomUUID(),
timestamp: new Date(),
name: QueueJobs.DatasetRunItemUpsert as const,
});
}
}
Checking Queue Availability
const queue = DatasetRunItemUpsertQueue.getInstance();
if (!queue) {
// Redis not available; skip delayed upsert.
// The run item was already created via processEventBatch
// and will be visible without the delay optimization.
logger.warn("DatasetRunItemUpsertQueue not available, skipping delayed upsert");
}