Implementation:Langfuse Langfuse Dataset Items Repository
| Knowledge Sources | |
|---|---|
| Domains | Datasets, Data Access, Validation |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Repository for dataset item CRUD operations with JSON schema validation against dataset schemas, supporting both stateful and versioned dataset service strategies.
Description
This module provides the complete data access layer for dataset items in Langfuse. Dataset items represent individual test cases within datasets, containing input data, expected output, and metadata. The repository validates all items against optional JSON schemas defined on their parent dataset before persisting.
Key capabilities:
- Create operations --
createDatasetItemcreates a single validated item;createManyDatasetItemshandles bulk creation (e.g., CSV upload, API batch) with per-item validation and compiled schemas for 3800x+ performance improvement. - Upsert operations --
upsertDatasetItemcreates or updates items identified by ID or name, merging partial updates with existing data before validation. - Read operations --
getDatasetItemById,getDatasetItems(with filtering, ordering, pagination), andgetDatasetItemsByIdsprovide various retrieval patterns. - Delete operations --
deleteDatasetItemremoves items from PostgreSQL and associated ClickHouse records. - Versioning support -- Operates through
executeWithDatasetServiceStrategyto support both STATEFUL (simple CRUD) and VERSIONED (temporal with valid_from/valid_to) modes. Functions likegetDatasetItemVersionHistoryandgetDatasetItemChangesSinceVersionprovide version-specific queries.
The module uses a domain type system with DatasetItemDomain (full I/O) and DatasetItemDomainWithoutIO (lightweight) variants, and a toDomainType converter that strips internal fields like isDeleted.
Input data is flexibly accepted as both JSON strings (from tRPC) and objects (from the Public API), with optional control character sanitization and undefined-to-null normalization.
Usage
Use this repository for all dataset item operations from tRPC routes, public API handlers, and worker processes. Always prefer the repository functions over direct Prisma calls to ensure schema validation is applied.
Code Reference
Source Location
- Repository: Langfuse
- File: packages/shared/src/server/repositories/dataset-items.ts
- Lines: 1-1845
Signature
export async function createDatasetItem(props: {
projectId: string;
datasetId: string;
input?: string | unknown | null;
expectedOutput?: string | unknown | null;
metadata?: string | unknown | null;
sourceTraceId?: string;
sourceObservationId?: string;
normalizeOpts?: { sanitizeControlChars?: boolean };
validateOpts?: { normalizeUndefinedToNull?: boolean };
}): Promise<{ success: true; datasetItem: DatasetItemDomain } | PayloadError>;
export async function createManyDatasetItems(props: {
projectId: string;
items: Array<{ datasetId: string; input?: unknown; expectedOutput?: unknown; metadata?: unknown; ... }>;
normalizeOpts?: { sanitizeControlChars?: boolean };
validateOpts?: { normalizeUndefinedToNull?: boolean };
}): Promise<BulkCreateResult>;
export async function upsertDatasetItem(props: {
projectId: string;
datasetItemId?: string;
input?: string | unknown | null;
expectedOutput?: string | unknown | null;
metadata?: string | unknown | null;
status?: DatasetStatus;
} & IdOrName): Promise<DatasetItemDomain>;
export async function getDatasetItemById(params: {
projectId: string;
datasetItemId: string;
}): Promise<DatasetItemDomain | null>;
export async function getDatasetItems(params: {
projectId: string;
datasetId: string;
filter?: FilterState;
orderBy?: OrderByState;
limit?: number;
offset?: number;
}): Promise<DatasetItemDomain[]>;
export async function deleteDatasetItem(params: {
projectId: string;
datasetItemId: string;
datasetId: string;
}): Promise<void>;
export async function getDatasetItemVersionHistory(props: {
projectId: string;
datasetId: string;
itemId: string;
}): Promise<Date[]>;
export async function getDatasetItemChangesSinceVersion(props: {
projectId: string;
datasetId: string;
sinceVersion: Date;
}): Promise<{ upserts: number; deletes: number }>;
Import
import {
createDatasetItem,
createManyDatasetItems,
upsertDatasetItem,
getDatasetItemById,
getDatasetItems,
deleteDatasetItem,
getDatasetItemVersionHistory,
getDatasetItemChangesSinceVersion,
} from "@langfuse/shared/src/server/repositories/dataset-items";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| projectId | string | Yes | Project scope for all operations |
| datasetId | string | Yes (for create/list) | Parent dataset ID |
| datasetItemId | string | No | Item ID for upsert or retrieval |
| input | unknown | null | No | Item input data (JSON string or object) |
| expectedOutput | unknown | null | No | Expected output for evaluation |
| metadata | unknown | null | No | Arbitrary metadata |
| normalizeOpts | { sanitizeControlChars?: boolean } | No | Input sanitization options |
| validateOpts | { normalizeUndefinedToNull?: boolean } | No | Validation behavior options |
| filter | FilterState | No | Filter conditions for listing |
| sinceVersion | Date | Yes (for changes query) | Version timestamp to compare against |
Outputs
| Name | Type | Description |
|---|---|---|
| DatasetItemDomain | DatasetItemDomain | Full dataset item with id, input, expectedOutput, metadata, timestamps |
| PayloadError | { success: false; message: string } | Validation error with descriptive message |
| BulkCreateResult | { success: boolean; datasetItems: []; validationErrors: [] } | Bulk create result with per-item errors |
| Version history | Date[] | Array of distinct validFrom timestamps |
| Changes since version | { upserts: number; deletes: number } | Count of changes since a version |
Usage Examples
import { createDatasetItem, upsertDatasetItem, getDatasetItems } from "@langfuse/shared/src/server/repositories/dataset-items";
// Create a single dataset item
const result = await createDatasetItem({
projectId: "proj_123",
datasetId: "ds_456",
input: { prompt: "What is AI?" },
expectedOutput: { answer: "Artificial Intelligence" },
});
if (result.success) {
console.log("Created:", result.datasetItem.id);
}
// Upsert by dataset name
const item = await upsertDatasetItem({
projectId: "proj_123",
datasetName: "my-dataset",
datasetItemId: "existing-item-id",
input: { prompt: "Updated prompt" },
validateOpts: { normalizeUndefinedToNull: true },
});
// List items with pagination
const items = await getDatasetItems({
projectId: "proj_123",
datasetId: "ds_456",
limit: 50,
offset: 0,
});