Implementation:Langfuse Langfuse IngestionQueue
| Knowledge Sources | |
|---|---|
| Domains | Message Queues, Trace Ingestion |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for sharded BullMQ queue management for ingestion event dispatch provided by Langfuse.
Description
The IngestionQueue class manages a set of sharded BullMQ queues that transport ingestion event references from the API server to background workers. It is implemented as a static singleton factory: each shard index maps to at most one Queue instance, which is lazily created on first access and cached for subsequent calls.
Key implementation details:
- Shard count configuration: The number of shards is controlled by
LANGFUSE_INGESTION_QUEUE_SHARD_COUNT. The default is 1 (single queue). - Shard routing: When
REDIS_CLUSTER_ENABLEDis"true", the sharding key ({projectId}-{eventBodyId}) is hashed to determine the shard index. Otherwise, shard 0 is always used. - Queue naming: Shard 0 is named
"ingestion-queue", shard N (N > 0) is named"ingestion-queue-{N}". - Job defaults:
removeOnComplete: true,removeOnFail: 100,000,attempts: 6, exponential backoff starting at 5 seconds. - Redis connection: Each shard creates its own Redis connection with
enableOfflineQueue: false(fail fast) and queue-specific retry options. The connection uses a queue prefix derived from the queue name.
The module also exports SecondaryIngestionQueue, a non-sharded overflow queue used when the primary queue encounters S3 SlowDown errors. It has similar configuration but with 5 retry attempts instead of 6.
Static methods:
getShardNames(): Returns an array of all shard queue names based on the configured shard count.getShardIndexFromShardName(shardName): Parses a shard name back to its numeric index.getInstance({ shardingKey?, shardName? }): Returns the queue for the given shard, creating it if necessary.
Usage
Call IngestionQueue.getInstance({ shardingKey }) from the API server when dispatching a new ingestion job. Call IngestionQueue.getInstance({ shardName }) from the worker when registering consumers for specific shards. Use IngestionQueue.getShardNames() to iterate over all shards when setting up workers.
Code Reference
Source Location
- Repository: langfuse
- File:
packages/shared/src/server/redis/ingestionQueue.ts - Lines: L12-136
Signature
export class IngestionQueue {
private static instances: Map<
number,
Queue<TQueueJobTypes[QueueName.IngestionQueue]> | null
>;
public static getShardNames(): string[];
static getShardIndexFromShardName(
shardName: string | undefined,
): number | null;
public static getInstance(params: {
shardingKey?: string;
shardName?: string;
}): Queue<TQueueJobTypes[QueueName.IngestionQueue]> | null;
}
export class SecondaryIngestionQueue {
private static instance: Queue<
TQueueJobTypes[QueueName.IngestionSecondaryQueue]
> | null;
public static getInstance(): Queue<
TQueueJobTypes[QueueName.IngestionSecondaryQueue]
> | null;
}
Import
import {
IngestionQueue,
SecondaryIngestionQueue,
} from "@langfuse/shared/src/server/redis/ingestionQueue";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| shardingKey | string |
No | A key used to determine the shard via hashing. Typically "{projectId}-{eventBodyId}". Used by the API server.
|
| shardName | string |
No | The explicit shard queue name (e.g., "ingestion-queue" or "ingestion-queue-2"). Used by the worker to register a consumer for a specific shard.
|
At least one of shardingKey or shardName should be provided. If neither is provided and Redis clustering is disabled, shard 0 is used.
Outputs
| Name | Type | Description |
|---|---|---|
| (return value) | null | The BullMQ Queue instance for the resolved shard, or null if Redis is not available.
|
| getShardNames() | string[] |
Array of all shard queue names, e.g., ["ingestion-queue", "ingestion-queue-1", "ingestion-queue-2"].
|
| getShardIndexFromShardName() | null | The numeric shard index, or null if the shard name is invalid.
|
Usage Examples
Dispatching a Job from the API Server
import { IngestionQueue } from "@langfuse/shared/src/server/redis/ingestionQueue";
import { QueueJobs } from "@langfuse/shared/src/server/queues";
import { randomUUID } from "crypto";
const projectId = "proj_abc123";
const eventBodyId = "obs_xyz789";
const shardingKey = `${projectId}-${eventBodyId}`;
const queue = IngestionQueue.getInstance({ shardingKey });
if (queue) {
await queue.add(
QueueJobs.IngestionJob,
{
id: randomUUID(),
timestamp: new Date(),
name: QueueJobs.IngestionJob,
payload: {
data: {
type: "generation-create",
eventBodyId,
fileKey: "evt_001",
skipS3List: false,
},
authCheck: {
validKey: true,
scope: { projectId, accessLevel: "project" },
},
},
},
{ delay: 5000 }, // 5-second delay
);
}
Registering Workers for All Shards
import { IngestionQueue } from "@langfuse/shared/src/server/redis/ingestionQueue";
import { Worker } from "bullmq";
const shardNames = IngestionQueue.getShardNames();
for (const shardName of shardNames) {
const queue = IngestionQueue.getInstance({ shardName });
if (!queue) continue;
const worker = new Worker(
shardName,
async (job) => {
// Process the ingestion job
const { data, authCheck } = job.data.payload;
await processIngestionJob(data, authCheck);
},
{ connection: /* redis connection */ },
);
}
Using the Secondary Queue for Overflow
import {
IngestionQueue,
SecondaryIngestionQueue,
} from "@langfuse/shared/src/server/redis/ingestionQueue";
// If the primary queue dispatch fails, fall back to the secondary queue
const primaryQueue = IngestionQueue.getInstance({ shardingKey });
const secondaryQueue = SecondaryIngestionQueue.getInstance();
try {
await primaryQueue?.add(/* ... */);
} catch (err) {
// Dispatch to secondary queue as fallback
await secondaryQueue?.add(/* ... */);
}