Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse Export Job Validation

From Leeroopedia
Knowledge Sources
Domains Batch Export, Job Processing, Data Pipeline
Last Updated 2026-02-14 00:00 GMT

Overview

Export Job Validation is the principle of applying a series of precondition checks and state transitions before committing worker resources to process an asynchronous export job, ensuring that only valid, timely, and non-duplicate jobs proceed through the pipeline.

Description

When a background worker picks up a batch export job from the queue, it must not blindly begin processing. Several conditions could have changed between the time the job was enqueued and the time it is dequeued:

  • The feature may have been disabled via environment configuration.
  • The user may have cancelled the export.
  • The job record may be stale (older than a configurable threshold).
  • The job may have already been partially processed due to a retry after a transient failure.

Export Job Validation establishes a guard clause pattern at the entry point of the worker function. Each guard is evaluated in sequence, and if any fails, the worker either exits gracefully or marks the job as failed with a descriptive message. Only after all guards pass does the worker transition the job status to PROCESSING and begin the resource-intensive data retrieval.

The key validation checks are:

  1. Feature gate: Verify that the batch export feature is enabled (e.g., LANGFUSE_S3_BATCH_EXPORT_ENABLED === "true"). If not, throw an error immediately so the queue can handle the failure.
  2. Existence check: Confirm the job record exists in the database for the given project and export IDs.
  3. Cancellation check: If the job status is CANCELLED, exit without processing. This allows users to cancel exports that have not yet started.
  4. Staleness check: If the job was created more than 30 days ago, mark it as FAILED with a message encouraging the user to retry. This prevents processing of orphaned or outdated jobs.
  5. Status check: Warn (but proceed) if the job is not in QUEUED status. This handles retry scenarios where a previously PROCESSING job is re-delivered after a worker crash.

After passing all validations, the worker parses the stored query using a Zod schema to ensure data integrity, preprocesses any comment-based filters, and then proceeds to the data streaming phase.

Usage

Use Export Job Validation whenever:

  • An asynchronous worker processes jobs that were enqueued at an earlier time and may have become stale or invalid.
  • The system supports user-initiated cancellation of pending jobs.
  • A feature toggle controls whether the processing infrastructure is available.
  • Jobs are retried automatically by the queue system and must handle re-entrant execution safely.

Theoretical Basis

The validation follows a guard clause chain with early returns, combined with a state machine for job status transitions:

FUNCTION handleExportJob(batchExportId, projectId):

  -- Guard 1: Feature gate
  IF NOT featureEnabled("BATCH_EXPORT"):
    THROW "Batch export is not enabled"

  -- Guard 2: Existence
  record = LOOKUP(batchExport WHERE id=batchExportId AND projectId=projectId)
  IF record IS NULL:
    THROW "Job not found"

  -- Guard 3: Cancellation
  IF record.status == CANCELLED:
    LOG "Job cancelled, skipping"
    RETURN  -- graceful exit, no error

  -- Guard 4: Staleness (30-day threshold)
  IF record.createdAt < (NOW - 30 days):
    UPDATE record SET status=FAILED, log="Please retry your export"
    RETURN  -- graceful exit with user-facing message

  -- Guard 5: Status warning
  IF record.status != QUEUED:
    WARN "Unexpected status, retrying anyway"

  -- Transition to processing
  UPDATE record SET status=PROCESSING

  -- Parse and validate the stored query
  parsedQuery = VALIDATE(record.query, BatchExportQuerySchema)
  IF parsedQuery.error:
    THROW "Invalid query: " + parsedQuery.error

  -- Preprocess comment filters if applicable
  processedFilter = preprocessCommentFilters(parsedQuery.filter, projectId)

  -- Proceed to data streaming, transformation, upload, and notification
  EXECUTE exportPipeline(parsedQuery, processedFilter, record)

The state machine transitions are:

QUEUED ----[validation passes]----> PROCESSING
QUEUED ----[user cancels]---------> CANCELLED (terminal)
QUEUED ----[stale > 30 days]------> FAILED (terminal)
PROCESSING --[success]------------> COMPLETED (terminal)
PROCESSING --[error]--------------> FAILED (terminal)

This design ensures that expensive operations (database streaming, S3 uploads) are only initiated for jobs that have a reasonable chance of completing successfully.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment