Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse IngestionQueue

From Leeroopedia
Knowledge Sources
Domains Message Queues, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for sharded BullMQ queue management for ingestion event dispatch provided by Langfuse.

Description

The IngestionQueue class manages a set of sharded BullMQ queues that transport ingestion event references from the API server to background workers. It is implemented as a static singleton factory: each shard index maps to at most one Queue instance, which is lazily created on first access and cached for subsequent calls.

Key implementation details:

  • Shard count configuration: The number of shards is controlled by LANGFUSE_INGESTION_QUEUE_SHARD_COUNT. The default is 1 (single queue).
  • Shard routing: When REDIS_CLUSTER_ENABLED is "true", the sharding key ({projectId}-{eventBodyId}) is hashed to determine the shard index. Otherwise, shard 0 is always used.
  • Queue naming: Shard 0 is named "ingestion-queue", shard N (N > 0) is named "ingestion-queue-{N}".
  • Job defaults: removeOnComplete: true, removeOnFail: 100,000, attempts: 6, exponential backoff starting at 5 seconds.
  • Redis connection: Each shard creates its own Redis connection with enableOfflineQueue: false (fail fast) and queue-specific retry options. The connection uses a queue prefix derived from the queue name.

The module also exports SecondaryIngestionQueue, a non-sharded overflow queue used when the primary queue encounters S3 SlowDown errors. It has similar configuration but with 5 retry attempts instead of 6.

Static methods:

  • getShardNames(): Returns an array of all shard queue names based on the configured shard count.
  • getShardIndexFromShardName(shardName): Parses a shard name back to its numeric index.
  • getInstance({ shardingKey?, shardName? }): Returns the queue for the given shard, creating it if necessary.

Usage

Call IngestionQueue.getInstance({ shardingKey }) from the API server when dispatching a new ingestion job. Call IngestionQueue.getInstance({ shardName }) from the worker when registering consumers for specific shards. Use IngestionQueue.getShardNames() to iterate over all shards when setting up workers.

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/redis/ingestionQueue.ts
  • Lines: L12-136

Signature

export class IngestionQueue {
  private static instances: Map<
    number,
    Queue<TQueueJobTypes[QueueName.IngestionQueue]> | null
  >;

  public static getShardNames(): string[];

  static getShardIndexFromShardName(
    shardName: string | undefined,
  ): number | null;

  public static getInstance(params: {
    shardingKey?: string;
    shardName?: string;
  }): Queue<TQueueJobTypes[QueueName.IngestionQueue]> | null;
}

export class SecondaryIngestionQueue {
  private static instance: Queue<
    TQueueJobTypes[QueueName.IngestionSecondaryQueue]
  > | null;

  public static getInstance(): Queue<
    TQueueJobTypes[QueueName.IngestionSecondaryQueue]
  > | null;
}

Import

import {
  IngestionQueue,
  SecondaryIngestionQueue,
} from "@langfuse/shared/src/server/redis/ingestionQueue";

I/O Contract

Inputs

Name Type Required Description
shardingKey string No A key used to determine the shard via hashing. Typically "{projectId}-{eventBodyId}". Used by the API server.
shardName string No The explicit shard queue name (e.g., "ingestion-queue" or "ingestion-queue-2"). Used by the worker to register a consumer for a specific shard.

At least one of shardingKey or shardName should be provided. If neither is provided and Redis clustering is disabled, shard 0 is used.

Outputs

Name Type Description
(return value) null The BullMQ Queue instance for the resolved shard, or null if Redis is not available.
getShardNames() string[] Array of all shard queue names, e.g., ["ingestion-queue", "ingestion-queue-1", "ingestion-queue-2"].
getShardIndexFromShardName() null The numeric shard index, or null if the shard name is invalid.

Usage Examples

Dispatching a Job from the API Server

import { IngestionQueue } from "@langfuse/shared/src/server/redis/ingestionQueue";
import { QueueJobs } from "@langfuse/shared/src/server/queues";
import { randomUUID } from "crypto";

const projectId = "proj_abc123";
const eventBodyId = "obs_xyz789";
const shardingKey = `${projectId}-${eventBodyId}`;

const queue = IngestionQueue.getInstance({ shardingKey });

if (queue) {
  await queue.add(
    QueueJobs.IngestionJob,
    {
      id: randomUUID(),
      timestamp: new Date(),
      name: QueueJobs.IngestionJob,
      payload: {
        data: {
          type: "generation-create",
          eventBodyId,
          fileKey: "evt_001",
          skipS3List: false,
        },
        authCheck: {
          validKey: true,
          scope: { projectId, accessLevel: "project" },
        },
      },
    },
    { delay: 5000 },  // 5-second delay
  );
}

Registering Workers for All Shards

import { IngestionQueue } from "@langfuse/shared/src/server/redis/ingestionQueue";
import { Worker } from "bullmq";

const shardNames = IngestionQueue.getShardNames();

for (const shardName of shardNames) {
  const queue = IngestionQueue.getInstance({ shardName });
  if (!queue) continue;

  const worker = new Worker(
    shardName,
    async (job) => {
      // Process the ingestion job
      const { data, authCheck } = job.data.payload;
      await processIngestionJob(data, authCheck);
    },
    { connection: /* redis connection */ },
  );
}

Using the Secondary Queue for Overflow

import {
  IngestionQueue,
  SecondaryIngestionQueue,
} from "@langfuse/shared/src/server/redis/ingestionQueue";

// If the primary queue dispatch fails, fall back to the secondary queue
const primaryQueue = IngestionQueue.getInstance({ shardingKey });
const secondaryQueue = SecondaryIngestionQueue.getInstance();

try {
  await primaryQueue?.add(/* ... */);
} catch (err) {
  // Dispatch to secondary queue as fallback
  await secondaryQueue?.add(/* ... */);
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment