Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse TraceUpsertQueue

From Leeroopedia
Knowledge Sources
Domains Event-Driven Architecture, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for managing the sharded BullMQ queue that triggers post-ingestion side effects (evaluations, webhooks, entity change notifications) with a configurable delay provided by Langfuse.

Description

The TraceUpsertQueue class manages a set of sharded BullMQ queues dedicated to post-ingestion side effects. When the ingestion worker successfully writes a trace or observation to ClickHouse, it enqueues a trace upsert job to this queue. After a configurable delay (default 30 seconds), the job becomes eligible for processing by workers that fan out to downstream queues.

Key implementation details:

  • Sharding: Supports configurable shard count via LANGFUSE_TRACE_UPSERT_QUEUE_SHARD_COUNT. Uses the same getShardIndex hashing function as IngestionQueue. The sharding key is typically {projectId}-{traceId}, ensuring all upsert events for the same trace are routed to the same shard.
  • 30-second default delay: Jobs are created with a delay: 30_000 millisecond default. This ensures that when a trace is created, the system waits 30 seconds before triggering side effects, giving time for all related observations and scores to be written first.
  • Configurable retry attempts: The number of retry attempts is controlled by LANGFUSE_TRACE_UPSERT_QUEUE_ATTEMPTS (environment variable). Retries use exponential backoff starting at 5 seconds.
  • Job retention: Completed jobs retain the last 100 entries for debugging (removeOnComplete: 100). Failed jobs retain up to 100,000 entries for investigation (removeOnFail: 100_000).
  • Singleton pattern: Like IngestionQueue, instances are cached in a static Map keyed by shard index. Each shard creates its own Redis connection.
  • Queue naming: Shard 0 is named using the base QueueName.TraceUpsert value (typically "trace-upsert-queue"), and shard N (N > 0) is named "trace-upsert-queue-{N}".

Static methods mirror those of IngestionQueue:

  • getShardNames(): Returns all shard queue names.
  • getShardIndexFromShardName(shardName): Parses shard name to index.
  • getInstance({ shardingKey?, shardName? }): Returns the queue for the resolved shard.

Usage

This queue is used by the ingestion worker after writing trace data to ClickHouse. The worker creates a job containing the trace ID, project ID, and timestamp. Downstream consumers of this queue then trigger evaluation job creation (CreateEvalQueue), entity change notifications (EntityChangeQueue), and webhook dispatch (WebhookQueue).

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/redis/traceUpsert.ts
  • Lines: L12-97

Signature

export class TraceUpsertQueue {
  private static instances: Map<
    number,
    Queue<TQueueJobTypes[QueueName.TraceUpsert]> | null
  >;

  public static getShardNames(): string[];

  static getShardIndexFromShardName(
    shardName: string | undefined,
  ): number | null;

  public static getInstance(params?: {
    shardingKey?: string;
    shardName?: string;
  }): Queue<TQueueJobTypes[QueueName.TraceUpsert]> | null;
}

Import

import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";

I/O Contract

Inputs

Name Type Required Description
shardingKey string No A key used to determine the shard via hashing. Typically "{projectId}-{traceId}". Used when enqueuing a new trace upsert job.
shardName string No The explicit shard queue name (e.g., "trace-upsert-queue" or "trace-upsert-queue-1"). Used when registering a worker for a specific shard.

Neither parameter is strictly required. If both are omitted and Redis clustering is disabled, shard 0 is used by default.

Outputs

Name Type Description
(return value) null The BullMQ Queue instance for the resolved shard, or null if Redis is not available.
getShardNames() string[] Array of all shard queue names, e.g., ["trace-upsert-queue", "trace-upsert-queue-1"].
getShardIndexFromShardName() null The numeric shard index, or null if the shard name cannot be parsed.

Usage Examples

Enqueuing a Trace Upsert from the Ingestion Worker

import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";
import { QueueJobs } from "@langfuse/shared/src/server/queues";
import { randomUUID } from "crypto";

const projectId = "proj_abc123";
const traceId = "trace_xyz789";
const shardingKey = `${projectId}-${traceId}`;

const queue = TraceUpsertQueue.getInstance({ shardingKey });

if (queue) {
  await queue.add(
    QueueJobs.TraceUpsert,
    {
      id: randomUUID(),
      timestamp: new Date(),
      name: QueueJobs.TraceUpsert,
      payload: {
        projectId,
        traceId,
        // Additional metadata for downstream processing
      },
    },
    // delay is inherited from defaultJobOptions (30,000 ms)
  );
}

Registering Workers for All Shards

import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";
import { Worker } from "bullmq";

const shardNames = TraceUpsertQueue.getShardNames();

for (const shardName of shardNames) {
  const queue = TraceUpsertQueue.getInstance({ shardName });
  if (!queue) continue;

  const worker = new Worker(
    shardName,
    async (job) => {
      const { projectId, traceId } = job.data.payload;

      // Fan out to downstream queues
      await createEvalJobsForTrace(projectId, traceId);
      await notifyEntityChange(projectId, traceId);
      await dispatchWebhooks(projectId, traceId);
    },
    {
      connection: /* redis connection */,
      concurrency: 10,
    },
  );
}

Querying Shard Configuration

import { TraceUpsertQueue } from "@langfuse/shared/src/server/redis/traceUpsert";

// Get all shard names for monitoring/health checks
const shards = TraceUpsertQueue.getShardNames();
console.log(`TraceUpsertQueue running with ${shards.length} shards:`, shards);
// Output: TraceUpsertQueue running with 3 shards:
//   ["trace-upsert-queue", "trace-upsert-queue-1", "trace-upsert-queue-2"]

// Parse a shard name from a worker event
const shardIndex = TraceUpsertQueue.getShardIndexFromShardName("trace-upsert-queue-2");
console.log(shardIndex); // 2

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment