Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse BullMQ Queue Factory

From Leeroopedia
Knowledge Sources
Domains Queue Management, Redis, Background Processing
Last Updated 2026-02-14 00:00 GMT

Overview

Central factory function that returns a BullMQ Queue instance for a given QueueName, providing a single entry point for accessing all non-sharded background job queues.

Description

The getQueue function acts as a registry and factory for all BullMQ job queues in the Langfuse system. It accepts a QueueName enum value (with sharded queues excluded from the type) and returns the corresponding singleton Queue instance via each queue class's getInstance() method, or null if the queue cannot be instantiated (e.g., Redis not configured).

The function uses an exhaustive switch statement over the QueueName enum, ensuring compile-time safety: if a new queue name is added to the enum but not handled here, TypeScript's never check will produce a type error.

Excluded queues: IngestionQueue, OtelIngestionQueue, and TraceUpsert are sharded queues that require a sharding key. They must be accessed directly via their respective class's getInstance({ shardName }) method rather than through this factory.

The factory supports over 25 queue types including batch export, evaluation execution, data retention, integrations (PostHog, Mixpanel, Blob Storage), trace/score/project deletion, webhooks, and more.

Usage

Use getQueue when you need to add jobs to or inspect a specific background processing queue and you know the queue name at compile time. For sharded queues, use their respective class constructors directly.

Code Reference

Source Location

Signature

export function getQueue(
  queueName: Exclude<
    QueueName,
    | QueueName.IngestionQueue
    | QueueName.TraceUpsert
    | QueueName.OtelIngestionQueue
  >,
): Queue | null;

Import

import { getQueue } from "@langfuse/shared/src/server/redis/getQueue";

I/O Contract

Inputs

Name Type Required Description
queueName QueueName.TraceUpsert | QueueName.OtelIngestionQueue> Yes The name of the queue to retrieve (sharded queues excluded)

Outputs

Name Type Description
(return) null The BullMQ Queue singleton instance, or null if the queue could not be instantiated

Usage Examples

import { getQueue } from "./getQueue";
import { QueueName } from "../queues";

// Get the batch export queue
const batchExportQueue = getQueue(QueueName.BatchExport);
if (batchExportQueue) {
  await batchExportQueue.add("export-job", { projectId: "proj-123" });
}

// Get the trace delete queue
const traceDeleteQueue = getQueue(QueueName.TraceDelete);
if (traceDeleteQueue) {
  await traceDeleteQueue.add("delete-traces", { traceIds: ["t1", "t2"] });
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment