Principle:Langfuse Langfuse Ingestion Queue Dispatch
| Knowledge Sources | |
|---|---|
| Domains | Message Queues, Trace Ingestion |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Queue dispatch is the practice of distributing ingestion workload across sharded BullMQ queues backed by Redis, ensuring reliable delivery of event references to background workers with configurable retry and backoff policies.
Description
After ingestion events have been validated, sampled, and persisted to S3, a lightweight job reference must be dispatched to a processing queue for the background worker to pick up. Langfuse uses BullMQ, a Redis-backed job queue library, for this purpose. The queue dispatch layer addresses several concerns:
- Sharding for horizontal scalability: The ingestion queue can be split into multiple shards. Each event is deterministically routed to a shard based on a hash of its sharding key (typically
{projectId}-{eventBodyId}). This ensures that events for the same entity always land on the same shard, preventing concurrent processing conflicts, while distributing the overall load across shards.
- Retry with exponential backoff: Jobs that fail (e.g., due to transient ClickHouse unavailability) are automatically retried up to 6 times with exponential backoff starting at 5 seconds. This gives the downstream system time to recover before retrying.
- Configurable delay: Jobs can be delayed before becoming eligible for processing. Langfuse uses a delay of up to 5 seconds (configurable) to batch multiple S3 files for the same entity, reducing the number of ClickHouse merge operations. Around UTC date boundaries (23:45-00:15), a longer delay is applied to avoid duplicate processing from out-of-order event arrival.
- Secondary overflow queue: A separate
SecondaryIngestionQueueis available as an overflow mechanism for projects experiencing S3 SlowDown errors. This prevents one project's throttling from blocking the primary queue.
- Singleton pattern with lazy initialization: Queue instances are created lazily and cached as singletons per shard index. This avoids unnecessary Redis connections while ensuring each shard has exactly one queue producer.
Usage
Apply this principle whenever:
- Adding a new type of background processing that should be decoupled from the API hot path.
- Scaling the ingestion pipeline by increasing the shard count.
- Tuning job retry behavior for different failure modes.
- Managing queue backpressure when downstream systems are slower than the ingestion rate.
Theoretical Basis
Sharding Algorithm
FUNCTION getShardIndex(shardingKey, shardCount):
hash = HASH(shardingKey)
RETURN hash MOD shardCount
Queue name format:
Shard 0: "ingestion-queue"
Shard N: "ingestion-queue-{N}" (for N > 0)
The sharding key is typically {projectId}-{eventBodyId}, which ensures that:
- All events for the same entity (trace, observation, score) are routed to the same shard.
- Events are distributed roughly uniformly across shards when there are many different entities.
Job Lifecycle
1. DISPATCH: API server adds job to queue with:
- Job data: { S3 file key, auth context, event type, entity body ID }
- Options: { delay, attempts: 6, backoff: exponential(5000ms) }
2. DELAY: Job sits in the "delayed" set for the configured delay period.
3. READY: After delay expires, job moves to the "waiting" set.
4. ACTIVE: Worker picks up the job and begins processing:
a. Download event data from S3 using file key.
b. Parse and validate the events.
c. Write to ClickHouse via IngestionService.
5. COMPLETE: On success, job is removed (removeOnComplete: true).
6. RETRY: On failure, job is returned to delayed set with exponential backoff:
Attempt 1: 5s delay
Attempt 2: 10s delay
Attempt 3: 20s delay
Attempt 4: 40s delay
Attempt 5: 80s delay
Attempt 6: 160s delay (final attempt)
7. FAILED: After 6 attempts, job moves to failed set.
Up to 100,000 failed jobs are retained for debugging.
Delay Strategy
FUNCTION getDelay(delayOverride, source):
IF delayOverride is not null:
RETURN delayOverride
currentTime = UTC_NOW()
IF currentTime is between 23:45 and 00:15:
RETURN LANGFUSE_INGESTION_QUEUE_DELAY_MS -- Longer delay around date boundaries
IF source == "otel":
RETURN 0 -- No delay for OTel events (already batched by collector)
RETURN MIN(5000, LANGFUSE_INGESTION_QUEUE_DELAY_MS) -- Default 5s delay
The 5-second default delay for API-sourced events gives the SDK time to send follow-up events (e.g., a span-update after a span-create) so the worker can process them together from S3, reducing ClickHouse merge overhead. OTel events skip this delay because the OTel collector already batches events before forwarding.
Redis Connection Management
Each shard creates its own Redis connection with:
- enableOfflineQueue: false (fail fast if Redis is unavailable)
- Retry options from redisQueueRetryOptions
- Queue prefix derived from queue name for key namespacing