Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse Ingestion Queue Dispatch

From Leeroopedia
Knowledge Sources
Domains Message Queues, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Queue dispatch is the practice of distributing ingestion workload across sharded BullMQ queues backed by Redis, ensuring reliable delivery of event references to background workers with configurable retry and backoff policies.

Description

After ingestion events have been validated, sampled, and persisted to S3, a lightweight job reference must be dispatched to a processing queue for the background worker to pick up. Langfuse uses BullMQ, a Redis-backed job queue library, for this purpose. The queue dispatch layer addresses several concerns:

  1. Sharding for horizontal scalability: The ingestion queue can be split into multiple shards. Each event is deterministically routed to a shard based on a hash of its sharding key (typically {projectId}-{eventBodyId}). This ensures that events for the same entity always land on the same shard, preventing concurrent processing conflicts, while distributing the overall load across shards.
  1. Retry with exponential backoff: Jobs that fail (e.g., due to transient ClickHouse unavailability) are automatically retried up to 6 times with exponential backoff starting at 5 seconds. This gives the downstream system time to recover before retrying.
  1. Configurable delay: Jobs can be delayed before becoming eligible for processing. Langfuse uses a delay of up to 5 seconds (configurable) to batch multiple S3 files for the same entity, reducing the number of ClickHouse merge operations. Around UTC date boundaries (23:45-00:15), a longer delay is applied to avoid duplicate processing from out-of-order event arrival.
  1. Secondary overflow queue: A separate SecondaryIngestionQueue is available as an overflow mechanism for projects experiencing S3 SlowDown errors. This prevents one project's throttling from blocking the primary queue.
  1. Singleton pattern with lazy initialization: Queue instances are created lazily and cached as singletons per shard index. This avoids unnecessary Redis connections while ensuring each shard has exactly one queue producer.

Usage

Apply this principle whenever:

  • Adding a new type of background processing that should be decoupled from the API hot path.
  • Scaling the ingestion pipeline by increasing the shard count.
  • Tuning job retry behavior for different failure modes.
  • Managing queue backpressure when downstream systems are slower than the ingestion rate.

Theoretical Basis

Sharding Algorithm

FUNCTION getShardIndex(shardingKey, shardCount):
    hash = HASH(shardingKey)
    RETURN hash MOD shardCount

Queue name format:
    Shard 0:  "ingestion-queue"
    Shard N:  "ingestion-queue-{N}"  (for N > 0)

The sharding key is typically {projectId}-{eventBodyId}, which ensures that:

  • All events for the same entity (trace, observation, score) are routed to the same shard.
  • Events are distributed roughly uniformly across shards when there are many different entities.

Job Lifecycle

1. DISPATCH: API server adds job to queue with:
     - Job data: { S3 file key, auth context, event type, entity body ID }
     - Options: { delay, attempts: 6, backoff: exponential(5000ms) }

2. DELAY: Job sits in the "delayed" set for the configured delay period.

3. READY: After delay expires, job moves to the "waiting" set.

4. ACTIVE: Worker picks up the job and begins processing:
     a. Download event data from S3 using file key.
     b. Parse and validate the events.
     c. Write to ClickHouse via IngestionService.

5. COMPLETE: On success, job is removed (removeOnComplete: true).

6. RETRY: On failure, job is returned to delayed set with exponential backoff:
     Attempt 1: 5s delay
     Attempt 2: 10s delay
     Attempt 3: 20s delay
     Attempt 4: 40s delay
     Attempt 5: 80s delay
     Attempt 6: 160s delay (final attempt)

7. FAILED: After 6 attempts, job moves to failed set.
     Up to 100,000 failed jobs are retained for debugging.

Delay Strategy

FUNCTION getDelay(delayOverride, source):
    IF delayOverride is not null:
        RETURN delayOverride

    currentTime = UTC_NOW()
    IF currentTime is between 23:45 and 00:15:
        RETURN LANGFUSE_INGESTION_QUEUE_DELAY_MS  -- Longer delay around date boundaries

    IF source == "otel":
        RETURN 0  -- No delay for OTel events (already batched by collector)

    RETURN MIN(5000, LANGFUSE_INGESTION_QUEUE_DELAY_MS)  -- Default 5s delay

The 5-second default delay for API-sourced events gives the SDK time to send follow-up events (e.g., a span-update after a span-create) so the worker can process them together from S3, reducing ClickHouse merge overhead. OTel events skip this delay because the OTel collector already batches events before forwarding.

Redis Connection Management

Each shard creates its own Redis connection with:
  - enableOfflineQueue: false  (fail fast if Redis is unavailable)
  - Retry options from redisQueueRetryOptions
  - Queue prefix derived from queue name for key namespacing

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment