Principle:Langfuse Langfuse Export Job Validation
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, Job Processing, Data Pipeline |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Export Job Validation is the principle of applying a series of precondition checks and state transitions before committing worker resources to process an asynchronous export job, ensuring that only valid, timely, and non-duplicate jobs proceed through the pipeline.
Description
When a background worker picks up a batch export job from the queue, it must not blindly begin processing. Several conditions could have changed between the time the job was enqueued and the time it is dequeued:
- The feature may have been disabled via environment configuration.
- The user may have cancelled the export.
- The job record may be stale (older than a configurable threshold).
- The job may have already been partially processed due to a retry after a transient failure.
Export Job Validation establishes a guard clause pattern at the entry point of the worker function. Each guard is evaluated in sequence, and if any fails, the worker either exits gracefully or marks the job as failed with a descriptive message. Only after all guards pass does the worker transition the job status to PROCESSING and begin the resource-intensive data retrieval.
The key validation checks are:
- Feature gate: Verify that the batch export feature is enabled (e.g.,
LANGFUSE_S3_BATCH_EXPORT_ENABLED === "true"). If not, throw an error immediately so the queue can handle the failure. - Existence check: Confirm the job record exists in the database for the given project and export IDs.
- Cancellation check: If the job status is CANCELLED, exit without processing. This allows users to cancel exports that have not yet started.
- Staleness check: If the job was created more than 30 days ago, mark it as FAILED with a message encouraging the user to retry. This prevents processing of orphaned or outdated jobs.
- Status check: Warn (but proceed) if the job is not in QUEUED status. This handles retry scenarios where a previously PROCESSING job is re-delivered after a worker crash.
After passing all validations, the worker parses the stored query using a Zod schema to ensure data integrity, preprocesses any comment-based filters, and then proceeds to the data streaming phase.
Usage
Use Export Job Validation whenever:
- An asynchronous worker processes jobs that were enqueued at an earlier time and may have become stale or invalid.
- The system supports user-initiated cancellation of pending jobs.
- A feature toggle controls whether the processing infrastructure is available.
- Jobs are retried automatically by the queue system and must handle re-entrant execution safely.
Theoretical Basis
The validation follows a guard clause chain with early returns, combined with a state machine for job status transitions:
FUNCTION handleExportJob(batchExportId, projectId):
-- Guard 1: Feature gate
IF NOT featureEnabled("BATCH_EXPORT"):
THROW "Batch export is not enabled"
-- Guard 2: Existence
record = LOOKUP(batchExport WHERE id=batchExportId AND projectId=projectId)
IF record IS NULL:
THROW "Job not found"
-- Guard 3: Cancellation
IF record.status == CANCELLED:
LOG "Job cancelled, skipping"
RETURN -- graceful exit, no error
-- Guard 4: Staleness (30-day threshold)
IF record.createdAt < (NOW - 30 days):
UPDATE record SET status=FAILED, log="Please retry your export"
RETURN -- graceful exit with user-facing message
-- Guard 5: Status warning
IF record.status != QUEUED:
WARN "Unexpected status, retrying anyway"
-- Transition to processing
UPDATE record SET status=PROCESSING
-- Parse and validate the stored query
parsedQuery = VALIDATE(record.query, BatchExportQuerySchema)
IF parsedQuery.error:
THROW "Invalid query: " + parsedQuery.error
-- Preprocess comment filters if applicable
processedFilter = preprocessCommentFilters(parsedQuery.filter, projectId)
-- Proceed to data streaming, transformation, upload, and notification
EXECUTE exportPipeline(parsedQuery, processedFilter, record)
The state machine transitions are:
QUEUED ----[validation passes]----> PROCESSING
QUEUED ----[user cancels]---------> CANCELLED (terminal)
QUEUED ----[stale > 30 days]------> FAILED (terminal)
PROCESSING --[success]------------> COMPLETED (terminal)
PROCESSING --[error]--------------> FAILED (terminal)
This design ensures that expensive operations (database streaming, S3 uploads) are only initiated for jobs that have a reasonable chance of completing successfully.