Principle:Langfuse Langfuse Dataset Run Item Upsert
| Knowledge Sources | |
|---|---|
| Domains | Data Pipeline, Eventual Consistency |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Dataset run item upsert is a delayed queue-based mechanism that defers the finalization of a dataset run item record until evaluations have had sufficient time to complete, ensuring that when the run item becomes visible in aggregation views it already has its associated scores attached.
Description
In the experiment pipeline, an LLM call produces a generation observation, and evaluation jobs are immediately enqueued to score that observation. However, evaluation jobs involve their own LLM calls (LLM-as-judge) and may take seconds to minutes to complete. If the dataset run item is finalized immediately after the LLM call, users querying the experiment results will see run items without scores, leading to a confusing experience where scores "trickle in" after the run appears complete.
The dataset run item upsert pattern solves this by introducing a deliberate delay between the LLM call and the final upsert of the run item record. This delay provides a window for evaluation jobs to execute and write their scores. When the delayed upsert finally fires, the run item is persisted (or updated) in its final form, and any downstream processing (such as additional evaluation triggers) can operate on a more complete record.
This approach embodies the principle of eventual consistency with bounded staleness: the system accepts a brief period where the run item is not yet finalized, in exchange for a much better user experience when the data does become visible.
Usage
The dataset run item upsert queue is used when:
- An experiment item has been processed (LLM call completed, evals scheduled) and the run item needs to be finalized after a grace period.
- The system needs to ensure that scores from LLM-as-judge evaluations are available before the run item appears in aggregation queries.
- A run item needs to be upserted idempotently, potentially triggering additional downstream evaluation jobs on the consumer side.
Theoretical Basis
The delayed upsert pattern combines two well-known patterns:
1. Delayed Job Execution
Rather than executing the upsert immediately, the job is placed on a durable queue with an initial delay. The delay is a configurable constant (typically 30 seconds) that represents the expected time for evaluation jobs to complete. This is not a hard guarantee -- evaluations that take longer than the delay will still have their scores attached eventually -- but it covers the vast majority of cases and dramatically improves the user experience.
2. Idempotent Upsert with Retry
The queue is configured with retry semantics (multiple attempts with exponential backoff) so that transient failures in the upsert operation do not cause data loss. Because the operation is an upsert (insert-or-update), retrying the same job produces the same result, making the system resilient to worker restarts and queue redelivery.
Queue Configuration Properties:
| Property | Value | Rationale |
|---|---|---|
| Initial delay | 30 seconds | Provides a window for LLM-as-judge evaluations to complete before the run item is finalized. |
| Maximum attempts | 5 | Ensures resilience against transient database or network failures. |
| Backoff strategy | Exponential, starting at 5 seconds | Progressively increases wait time between retries to avoid overwhelming a recovering service. |
| Remove on complete | true | Keeps the queue clean by discarding successfully processed jobs. |
| Remove on fail limit | 10,000 | Retains a bounded number of failed jobs for debugging purposes. |
FUNCTION enqueueRunItemUpsert(datasetItem, traceId, projectId):
job = {
payload: {
projectId,
datasetItemId: datasetItem.id,
datasetItemValidFrom: datasetItem.validFrom,
traceId,
},
id: randomUUID(),
timestamp: now(),
}
queue.add(job, {
delay: 30_000, // 30 seconds
attempts: 5,
backoff: exponential(5_000),
})
// Consumer (worker side):
FUNCTION processRunItemUpsert(job):
upsertDatasetRunItem(job.payload)
// May trigger additional downstream evaluations