Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse EvalJobExecutorQueueProcessor

From Leeroopedia
Knowledge Sources
Domains Error Handling, Job Queue Management
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for processing evaluation execution queue jobs with tiered error handling and retry logic provided by Langfuse.

Description

The evalJobExecutorQueueProcessor is a BullMQ queue processor function that handles jobs from the EvaluationExecution queue. It serves as the entry point for executing individual evaluations and implements the full error handling strategy for the evaluation pipeline.

On the success path, the processor:

  1. Logs the job execution details
  2. Sets OpenTelemetry span attributes for observability (jobExecutionId, projectId, retry attempt)
  3. Calls the evaluate function which performs the full evaluation pipeline (fetch job/config/template, extract variables, call LLM, validate response, persist score)
  4. Returns true to signal successful completion to BullMQ

On the error path, the processor classifies the error and takes appropriate action:

For retryable LLM errors (429/5xx):

  • Calls retryLLMRateLimitError which enqueues a new delayed job to the EvalExecutionQueue with incremented attempt count and a delay of 1-25 minutes
  • Updates the job execution status to DELAYED in PostgreSQL
  • Sets the executionTraceId for debugging
  • Returns without throwing (the current BullMQ job completes successfully, and a new delayed job takes over)

For non-retryable LLM errors (4xx):

  • Updates the job execution status to ERROR with the LLM error message preserved for user visibility
  • Sets the endTime and executionTraceId
  • Returns without throwing (no retry)

For UnrecoverableErrors (configuration/validation issues):

  • Updates the job execution status to ERROR with the error message
  • Returns without throwing (no retry)

For unexpected application errors:

  • Updates the job execution status to ERROR with a generic "An internal error occurred" message
  • Logs the error and sends it to the exception tracker (OpenTelemetry)
  • Re-throws the error to trigger BullMQ's built-in retry with exponential backoff

The module also exports a parallel llmAsJudgeExecutionQueueProcessor for observation-level evaluations, which follows the same error handling pattern but targets the LLMAsJudgeExecution queue.

Usage

This processor is registered with the BullMQ worker for the EvaluationExecution queue during worker initialization. It is not called directly from application code.

Code Reference

Source Location

  • Repository: langfuse
  • File: worker/src/queues/evalQueue.ts
  • Lines: 116-223

Signature

export const evalJobExecutorQueueProcessor = async (
  job: Job<TQueueJobTypes[QueueName.EvaluationExecution]>,
): Promise<boolean | void>;

Import

import { evalJobExecutorQueueProcessor } from "./queues/evalQueue";

I/O Contract

Inputs

Name Type Required Description
job Job<TQueueJobTypes[QueueName.EvaluationExecution]> Yes BullMQ job containing the evaluation execution payload.
job.data.payload.jobExecutionId string Yes The ID of the job execution record in PostgreSQL to process.
job.data.payload.projectId string Yes The project ID for scoping database queries and authorization.
job.data.payload.delay number No The configured delay in milliseconds (informational, already applied by BullMQ).
job.data.retryBaggage.originalJobTimestamp Date No The original job creation timestamp, used for the 24-hour retry window check.
job.data.retryBaggage.attempt number No The current retry attempt number (0-indexed), used for calculating retry delays.

Outputs

Name Type Description
true boolean Returned on successful evaluation execution. The job execution record is updated to COMPLETED with jobOutputScoreId.
void void Returned when a retryable LLM error triggers a delayed retry or a non-retryable error terminates the execution. The job execution record is updated to DELAYED or ERROR accordingly.
(throws) Error Thrown for unexpected application errors to trigger BullMQ's built-in retry. The job execution record is set to ERROR.

Usage Examples

Registering the Processor with BullMQ Worker

import { Worker } from "bullmq";
import { QueueName } from "@langfuse/shared/src/server";
import { evalJobExecutorQueueProcessor } from "./queues/evalQueue";

const evalWorker = new Worker(
  QueueName.EvaluationExecution,
  evalJobExecutorQueueProcessor,
  {
    connection: redisConnection,
    concurrency: 5,
  },
);

Understanding the Error Flow

// Example: How a rate limit error is handled

// 1. BullMQ dispatches the job
// evalJobExecutorQueueProcessor(job) is called

// 2. evaluate() calls fetchLLMCompletion() which throws:
//    LLMCompletionError("Rate limit exceeded", { isRetryable: true, statusCode: 429 })

// 3. The catch block detects isRetryable = true
// 4. retryLLMRateLimitError() enqueues a NEW job with:
//    - delay: delayInMs(attempt)  (e.g., 60000ms for attempt 0)
//    - retryBaggage: { originalJobTimestamp, attempt: attempt + 1 }
// 5. Job execution is updated: status = DELAYED, executionTraceId = W3C trace ID
// 6. The processor returns void (current BullMQ job completes without error)
// 7. After the delay, BullMQ dispatches the new job
// 8. The cycle repeats until success or 24-hour timeout

Inspecting Job Execution States

// After processing, the job execution record reflects one of:

// Success:
// { status: "COMPLETED", jobOutputScoreId: "score-123", endTime: Date, executionTraceId: "..." }

// Retryable error (waiting for retry):
// { status: "DELAYED", executionTraceId: "..." }

// Non-retryable error:
// { status: "ERROR", error: "Model gpt-5 not found", endTime: Date, executionTraceId: "..." }

// Unexpected error (BullMQ will retry):
// { status: "ERROR", error: "An internal error occurred", endTime: Date, executionTraceId: "..." }

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment