Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse Dataset Item Processing

From Leeroopedia
Knowledge Sources
Domains LLM Experimentation, Data Pipeline
Last Updated 2026-02-14 00:00 GMT

Overview

Dataset item processing is the orchestration pattern by which a worker retrieves, validates, deduplicates, and sequentially executes LLM calls for each item in a dataset run, ensuring idempotent and fault-tolerant experiment execution.

Description

When an experiment is dispatched to the processing queue, the worker must turn a high-level instruction ("run this prompt against this dataset") into a sequence of concrete actions for each dataset item. This involves several challenges:

  • Validation: The worker must re-validate the experiment configuration (prompt, API key, model parameters) because the configuration may have changed or become invalid between the time the user created the experiment and the time the worker picks up the job.
  • Filtering: Not all dataset items may be suitable. Items must be active, must match the optional version filter, and must have inputs that satisfy the prompt's variable requirements.
  • Deduplication: If the job is retried (due to a transient failure or worker restart), some items may already have been processed. The worker must identify and skip these to avoid duplicate LLM calls and traces.
  • Sequential execution: Items are processed one at a time to provide predictable resource consumption and to simplify error isolation. If one item fails, the worker logs the error and moves to the next item rather than aborting the entire run.
  • Error handling: Configuration-level errors (missing prompt, invalid API key) are handled by creating error-level traces for all dataset items, giving the user visibility into what went wrong.

Usage

Dataset item processing is used whenever:

  • A worker picks up an experiment creation job from the queue.
  • An experiment needs to be retried after a partial failure.
  • The system needs to resume processing after a worker restart, skipping already-completed items.

Theoretical Basis

The processing pipeline follows a validate-fetch-deduplicate-execute pattern:

Phase 1 -- Configuration Validation

The worker retrieves the dataset run record from PostgreSQL and parses its metadata to extract the prompt ID, provider, model, and model parameters. It then validates each component:

  • The dataset run must exist.
  • The metadata must conform to the expected schema.
  • The referenced prompt must exist and have a valid format.
  • An API key for the specified provider must be configured in the project.

If any validation fails with an unrecoverable error, the worker creates error-level dataset run items for all active dataset items, providing the user with clear feedback about the configuration problem.

Phase 2 -- Item Retrieval and Filtering

Active dataset items are fetched from the data store, optionally filtered by version. Each item's input is validated against the prompt's variable requirements using the same logic as the configuration validation step. Items with incompatible inputs are silently excluded. String inputs for single-variable prompts are normalized into object format for consistent downstream processing.

Phase 3 -- Deduplication

The worker queries the dataset_run_items_rmt table in ClickHouse to obtain the set of dataset item IDs that already have run items for this specific run. Any dataset item whose ID appears in this set is excluded from processing. This makes the entire operation idempotent: re-running the same job produces no duplicate work.

Phase 4 -- Sequential Execution

The remaining items are processed in a sequential loop. For each item, the worker:

  1. Creates a dataset run item record (via ingestion event).
  2. Executes the LLM call with the prompt template and item input.
  3. Schedules observation-level evaluations if generation details are available.
  4. Enqueues a delayed dataset run item upsert for post-evaluation processing.

Errors in individual items are caught and logged but do not halt the loop.

FUNCTION processExperimentJob(event):
    config = validateAndSetupExperiment(event)
    IF config validation fails:
        createErrorRunItems(event.datasetId, error)
        RETURN success

    allItems = fetchActiveItems(event.datasetId, config.version)
    validItems = filter(allItems, item => matchesVariables(item, config))
    existingIds = queryClickHouse(event.runId, event.datasetId)
    itemsToProcess = filter(validItems, item => item.id NOT IN existingIds)

    FOR EACH item IN itemsToProcess:
        TRY:
            createRunItemRecord(item, config)
            executeLLMCall(item, config)
            scheduleEvals(item, config)
            enqueueDelayedUpsert(item)
        CATCH error:
            log(error)
            CONTINUE

    RETURN success

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment