Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse Dataset Run Item Upsert

From Leeroopedia
Knowledge Sources
Domains Data Pipeline, Eventual Consistency
Last Updated 2026-02-14 00:00 GMT

Overview

Dataset run item upsert is a delayed queue-based mechanism that defers the finalization of a dataset run item record until evaluations have had sufficient time to complete, ensuring that when the run item becomes visible in aggregation views it already has its associated scores attached.

Description

In the experiment pipeline, an LLM call produces a generation observation, and evaluation jobs are immediately enqueued to score that observation. However, evaluation jobs involve their own LLM calls (LLM-as-judge) and may take seconds to minutes to complete. If the dataset run item is finalized immediately after the LLM call, users querying the experiment results will see run items without scores, leading to a confusing experience where scores "trickle in" after the run appears complete.

The dataset run item upsert pattern solves this by introducing a deliberate delay between the LLM call and the final upsert of the run item record. This delay provides a window for evaluation jobs to execute and write their scores. When the delayed upsert finally fires, the run item is persisted (or updated) in its final form, and any downstream processing (such as additional evaluation triggers) can operate on a more complete record.

This approach embodies the principle of eventual consistency with bounded staleness: the system accepts a brief period where the run item is not yet finalized, in exchange for a much better user experience when the data does become visible.

Usage

The dataset run item upsert queue is used when:

  • An experiment item has been processed (LLM call completed, evals scheduled) and the run item needs to be finalized after a grace period.
  • The system needs to ensure that scores from LLM-as-judge evaluations are available before the run item appears in aggregation queries.
  • A run item needs to be upserted idempotently, potentially triggering additional downstream evaluation jobs on the consumer side.

Theoretical Basis

The delayed upsert pattern combines two well-known patterns:

1. Delayed Job Execution

Rather than executing the upsert immediately, the job is placed on a durable queue with an initial delay. The delay is a configurable constant (typically 30 seconds) that represents the expected time for evaluation jobs to complete. This is not a hard guarantee -- evaluations that take longer than the delay will still have their scores attached eventually -- but it covers the vast majority of cases and dramatically improves the user experience.

2. Idempotent Upsert with Retry

The queue is configured with retry semantics (multiple attempts with exponential backoff) so that transient failures in the upsert operation do not cause data loss. Because the operation is an upsert (insert-or-update), retrying the same job produces the same result, making the system resilient to worker restarts and queue redelivery.

Queue Configuration Properties:

Property Value Rationale
Initial delay 30 seconds Provides a window for LLM-as-judge evaluations to complete before the run item is finalized.
Maximum attempts 5 Ensures resilience against transient database or network failures.
Backoff strategy Exponential, starting at 5 seconds Progressively increases wait time between retries to avoid overwhelming a recovering service.
Remove on complete true Keeps the queue clean by discarding successfully processed jobs.
Remove on fail limit 10,000 Retains a bounded number of failed jobs for debugging purposes.
FUNCTION enqueueRunItemUpsert(datasetItem, traceId, projectId):
    job = {
        payload: {
            projectId,
            datasetItemId: datasetItem.id,
            datasetItemValidFrom: datasetItem.validFrom,
            traceId,
        },
        id: randomUUID(),
        timestamp: now(),
    }

    queue.add(job, {
        delay: 30_000,        // 30 seconds
        attempts: 5,
        backoff: exponential(5_000),
    })

// Consumer (worker side):
FUNCTION processRunItemUpsert(job):
    upsertDatasetRunItem(job.payload)
    // May trigger additional downstream evaluations

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment