Implementation:Langfuse Langfuse EvalJobExecutorQueueProcessor
| Knowledge Sources | |
|---|---|
| Domains | Error Handling, Job Queue Management |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for processing evaluation execution queue jobs with tiered error handling and retry logic provided by Langfuse.
Description
The evalJobExecutorQueueProcessor is a BullMQ queue processor function that handles jobs from the EvaluationExecution queue. It serves as the entry point for executing individual evaluations and implements the full error handling strategy for the evaluation pipeline.
On the success path, the processor:
- Logs the job execution details
- Sets OpenTelemetry span attributes for observability (jobExecutionId, projectId, retry attempt)
- Calls the evaluate function which performs the full evaluation pipeline (fetch job/config/template, extract variables, call LLM, validate response, persist score)
- Returns true to signal successful completion to BullMQ
On the error path, the processor classifies the error and takes appropriate action:
For retryable LLM errors (429/5xx):
- Calls retryLLMRateLimitError which enqueues a new delayed job to the EvalExecutionQueue with incremented attempt count and a delay of 1-25 minutes
- Updates the job execution status to DELAYED in PostgreSQL
- Sets the executionTraceId for debugging
- Returns without throwing (the current BullMQ job completes successfully, and a new delayed job takes over)
For non-retryable LLM errors (4xx):
- Updates the job execution status to ERROR with the LLM error message preserved for user visibility
- Sets the endTime and executionTraceId
- Returns without throwing (no retry)
For UnrecoverableErrors (configuration/validation issues):
- Updates the job execution status to ERROR with the error message
- Returns without throwing (no retry)
For unexpected application errors:
- Updates the job execution status to ERROR with a generic "An internal error occurred" message
- Logs the error and sends it to the exception tracker (OpenTelemetry)
- Re-throws the error to trigger BullMQ's built-in retry with exponential backoff
The module also exports a parallel llmAsJudgeExecutionQueueProcessor for observation-level evaluations, which follows the same error handling pattern but targets the LLMAsJudgeExecution queue.
Usage
This processor is registered with the BullMQ worker for the EvaluationExecution queue during worker initialization. It is not called directly from application code.
Code Reference
Source Location
- Repository: langfuse
- File: worker/src/queues/evalQueue.ts
- Lines: 116-223
Signature
export const evalJobExecutorQueueProcessor = async (
job: Job<TQueueJobTypes[QueueName.EvaluationExecution]>,
): Promise<boolean | void>;
Import
import { evalJobExecutorQueueProcessor } from "./queues/evalQueue";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job | Job<TQueueJobTypes[QueueName.EvaluationExecution]> | Yes | BullMQ job containing the evaluation execution payload. |
| job.data.payload.jobExecutionId | string | Yes | The ID of the job execution record in PostgreSQL to process. |
| job.data.payload.projectId | string | Yes | The project ID for scoping database queries and authorization. |
| job.data.payload.delay | number | No | The configured delay in milliseconds (informational, already applied by BullMQ). |
| job.data.retryBaggage.originalJobTimestamp | Date | No | The original job creation timestamp, used for the 24-hour retry window check. |
| job.data.retryBaggage.attempt | number | No | The current retry attempt number (0-indexed), used for calculating retry delays. |
Outputs
| Name | Type | Description |
|---|---|---|
| true | boolean | Returned on successful evaluation execution. The job execution record is updated to COMPLETED with jobOutputScoreId. |
| void | void | Returned when a retryable LLM error triggers a delayed retry or a non-retryable error terminates the execution. The job execution record is updated to DELAYED or ERROR accordingly. |
| (throws) | Error | Thrown for unexpected application errors to trigger BullMQ's built-in retry. The job execution record is set to ERROR. |
Usage Examples
Registering the Processor with BullMQ Worker
import { Worker } from "bullmq";
import { QueueName } from "@langfuse/shared/src/server";
import { evalJobExecutorQueueProcessor } from "./queues/evalQueue";
const evalWorker = new Worker(
QueueName.EvaluationExecution,
evalJobExecutorQueueProcessor,
{
connection: redisConnection,
concurrency: 5,
},
);
Understanding the Error Flow
// Example: How a rate limit error is handled
// 1. BullMQ dispatches the job
// evalJobExecutorQueueProcessor(job) is called
// 2. evaluate() calls fetchLLMCompletion() which throws:
// LLMCompletionError("Rate limit exceeded", { isRetryable: true, statusCode: 429 })
// 3. The catch block detects isRetryable = true
// 4. retryLLMRateLimitError() enqueues a NEW job with:
// - delay: delayInMs(attempt) (e.g., 60000ms for attempt 0)
// - retryBaggage: { originalJobTimestamp, attempt: attempt + 1 }
// 5. Job execution is updated: status = DELAYED, executionTraceId = W3C trace ID
// 6. The processor returns void (current BullMQ job completes without error)
// 7. After the delay, BullMQ dispatches the new job
// 8. The cycle repeats until success or 24-hour timeout
Inspecting Job Execution States
// After processing, the job execution record reflects one of:
// Success:
// { status: "COMPLETED", jobOutputScoreId: "score-123", endTime: Date, executionTraceId: "..." }
// Retryable error (waiting for retry):
// { status: "DELAYED", executionTraceId: "..." }
// Non-retryable error:
// { status: "ERROR", error: "Model gpt-5 not found", endTime: Date, executionTraceId: "..." }
// Unexpected error (BullMQ will retry):
// { status: "ERROR", error: "An internal error occurred", endTime: Date, executionTraceId: "..." }