Implementation:Langfuse Langfuse TraceUpsertQueue
| Knowledge Sources | |
|---|---|
| Domains | Event-Driven Architecture, Trace Ingestion |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for managing the sharded BullMQ queue that triggers post-ingestion side effects (evaluations, webhooks, entity change notifications) with a configurable delay provided by Langfuse.
Description
The TraceUpsertQueue class manages a set of sharded BullMQ queues dedicated to post-ingestion side effects. When the ingestion worker successfully writes a trace or observation to ClickHouse, it enqueues a trace upsert job to this queue. After a configurable delay (default 30 seconds), the job becomes eligible for processing by workers that fan out to downstream queues.
Key implementation details:
- Sharding: Supports configurable shard count via
LANGFUSE_TRACE_UPSERT_QUEUE_SHARD_COUNT. Uses the samegetShardIndexhashing function asIngestionQueue. The sharding key is typically{projectId}-{traceId}, ensuring all upsert events for the same trace are routed to the same shard.
- 30-second default delay: Jobs are created with a
delay: 30_000millisecond default. This ensures that when a trace is created, the system waits 30 seconds before triggering side effects, giving time for all related observations and scores to be written first.
- Configurable retry attempts: The number of retry attempts is controlled by
LANGFUSE_TRACE_UPSERT_QUEUE_ATTEMPTS(environment variable). Retries use exponential backoff starting at 5 seconds.
- Job retention: Completed jobs retain the last 100 entries for debugging (
removeOnComplete: 100). Failed jobs retain up to 100,000 entries for investigation (removeOnFail: 100_000).
- Singleton pattern: Like
IngestionQueue, instances are cached in a staticMapkeyed by shard index. Each shard creates its own Redis connection.
- Queue naming: Shard 0 is named using the base
QueueName.TraceUpsertvalue (typically"trace-upsert-queue"), and shard N (N > 0) is named"trace-upsert-queue-{N}".
Static methods mirror those of IngestionQueue:
getShardNames(): Returns all shard queue names.getShardIndexFromShardName(shardName): Parses shard name to index.getInstance({ shardingKey?, shardName? }): Returns the queue for the resolved shard.
Usage
This queue is used by the ingestion worker after writing trace data to ClickHouse. The worker creates a job containing the trace ID, project ID, and timestamp. Downstream consumers of this queue then trigger evaluation job creation (CreateEvalQueue), entity change notifications (EntityChangeQueue), and webhook dispatch (WebhookQueue).
Code Reference
Source Location
- Repository: langfuse
- File:
packages/shared/src/server/redis/traceUpsert.ts - Lines: L12-97
Signature
export class TraceUpsertQueue {
private static instances: Map<
number,
Queue<TQueueJobTypes[QueueName.TraceUpsert]> | null
>;
public static getShardNames(): string[];
static getShardIndexFromShardName(
shardName: string | undefined,
): number | null;
public static getInstance(params?: {
shardingKey?: string;
shardName?: string;
}): Queue<TQueueJobTypes[QueueName.TraceUpsert]> | null;
}
Import
import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| shardingKey | string |
No | A key used to determine the shard via hashing. Typically "{projectId}-{traceId}". Used when enqueuing a new trace upsert job.
|
| shardName | string |
No | The explicit shard queue name (e.g., "trace-upsert-queue" or "trace-upsert-queue-1"). Used when registering a worker for a specific shard.
|
Neither parameter is strictly required. If both are omitted and Redis clustering is disabled, shard 0 is used by default.
Outputs
| Name | Type | Description |
|---|---|---|
| (return value) | null | The BullMQ Queue instance for the resolved shard, or null if Redis is not available.
|
| getShardNames() | string[] |
Array of all shard queue names, e.g., ["trace-upsert-queue", "trace-upsert-queue-1"].
|
| getShardIndexFromShardName() | null | The numeric shard index, or null if the shard name cannot be parsed.
|
Usage Examples
Enqueuing a Trace Upsert from the Ingestion Worker
import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";
import { QueueJobs } from "@langfuse/shared/src/server/queues";
import { randomUUID } from "crypto";
const projectId = "proj_abc123";
const traceId = "trace_xyz789";
const shardingKey = `${projectId}-${traceId}`;
const queue = TraceUpsertQueue.getInstance({ shardingKey });
if (queue) {
await queue.add(
QueueJobs.TraceUpsert,
{
id: randomUUID(),
timestamp: new Date(),
name: QueueJobs.TraceUpsert,
payload: {
projectId,
traceId,
// Additional metadata for downstream processing
},
},
// delay is inherited from defaultJobOptions (30,000 ms)
);
}
Registering Workers for All Shards
import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";
import { Worker } from "bullmq";
const shardNames = TraceUpsertQueue.getShardNames();
for (const shardName of shardNames) {
const queue = TraceUpsertQueue.getInstance({ shardName });
if (!queue) continue;
const worker = new Worker(
shardName,
async (job) => {
const { projectId, traceId } = job.data.payload;
// Fan out to downstream queues
await createEvalJobsForTrace(projectId, traceId);
await notifyEntityChange(projectId, traceId);
await dispatchWebhooks(projectId, traceId);
},
{
connection: /* redis connection */,
concurrency: 10,
},
);
}
Querying Shard Configuration
import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";
// Get all shard names for monitoring/health checks
const shards = TraceUpsertQueue.getShardNames();
console.log(`TraceUpsertQueue running with ${shards.length} shards:`, shards);
// Output: TraceUpsertQueue running with 3 shards:
// ["trace-upsert-queue", "trace-upsert-queue-1", "trace-upsert-queue-2"]
// Parse a shard name from a worker event
const shardIndex = TraceUpsertQueue.getShardIndexFromShardName("trace-upsert-queue-2");
console.log(shardIndex); // 2