Implementation:Langfuse Langfuse BullMQ Queue Factory
| Knowledge Sources | |
|---|---|
| Domains | Queue Management, Redis, Background Processing |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Central factory function that returns a BullMQ Queue instance for a given QueueName, providing a single entry point for accessing all non-sharded background job queues.
Description
The getQueue function acts as a registry and factory for all BullMQ job queues in the Langfuse system. It accepts a QueueName enum value (with sharded queues excluded from the type) and returns the corresponding singleton Queue instance via each queue class's getInstance() method, or null if the queue cannot be instantiated (e.g., Redis not configured).
The function uses an exhaustive switch statement over the QueueName enum, ensuring compile-time safety: if a new queue name is added to the enum but not handled here, TypeScript's never check will produce a type error.
Excluded queues: IngestionQueue, OtelIngestionQueue, and TraceUpsert are sharded queues that require a sharding key. They must be accessed directly via their respective class's getInstance({ shardName }) method rather than through this factory.
The factory supports over 25 queue types including batch export, evaluation execution, data retention, integrations (PostHog, Mixpanel, Blob Storage), trace/score/project deletion, webhooks, and more.
Usage
Use getQueue when you need to add jobs to or inspect a specific background processing queue and you know the queue name at compile time. For sharded queues, use their respective class constructors directly.
Code Reference
Source Location
- Repository: Langfuse
- File: packages/shared/src/server/redis/getQueue.ts
- Lines: 1-110
Signature
export function getQueue(
queueName: Exclude<
QueueName,
| QueueName.IngestionQueue
| QueueName.TraceUpsert
| QueueName.OtelIngestionQueue
>,
): Queue | null;
Import
import { getQueue } from "@langfuse/shared/src/server/redis/getQueue";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| queueName | QueueName.TraceUpsert | QueueName.OtelIngestionQueue> | Yes | The name of the queue to retrieve (sharded queues excluded) |
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | null | The BullMQ Queue singleton instance, or null if the queue could not be instantiated |
Usage Examples
import { getQueue } from "./getQueue";
import { QueueName } from "../queues";
// Get the batch export queue
const batchExportQueue = getQueue(QueueName.BatchExport);
if (batchExportQueue) {
await batchExportQueue.add("export-job", { projectId: "proj-123" });
}
// Get the trace delete queue
const traceDeleteQueue = getQueue(QueueName.TraceDelete);
if (traceDeleteQueue) {
await traceDeleteQueue.add("delete-traces", { traceIds: ["t1", "t2"] });
}