Implementation:Langfuse Langfuse ClickHouse Schema Definitions
| Knowledge Sources | |
|---|---|
| Domains | ClickHouse, Schema, Data Access |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Zod schema definitions and type-safe record types for all ClickHouse tables in Langfuse, including observations, traces, scores, events, dataset run items, and blob storage file logs.
Description
This module defines the Zod validation schemas and TypeScript types that represent the structure of ClickHouse records throughout Langfuse. It serves as the single source of truth for ClickHouse data shapes, providing both read schemas (for parsing query results) and insert schemas (for validating data before insertion).
The module defines schemas for the following ClickHouse tables:
- Observations --
observationRecordReadSchema/observationRecordInsertSchemawith fields for tracing data including model details, usage/cost maps, prompt references, and tool definitions. The events-specific variant addsuser_idandsession_id. - Traces --
traceRecordReadSchema/traceRecordInsertSchemawith metadata, tags, bookmarked/public flags, and session references. AlsotraceNullRecordInsertSchemafor the aggregated traces_null table with observation/score IDs and cost/usage aggregations. - Scores --
scoreRecordReadSchema/scoreRecordInsertSchemawith value, data type, source, comment, and evaluation metadata. - Events --
eventRecordReadSchema/eventRecordInsertSchemafor the unified events table with experiment properties, source metadata (OTel SDK info), and blob storage references. - Dataset Run Items --
datasetRunItemRecordInsertSchemawith denormalized run and item data. - Blob Storage File Log --
blobStorageFileLogRecordInsertSchemafor tracking S3 event files.
The module also provides converter functions for cross-format transformations:
convertTraceReadToInsert,convertObservationReadToInsert,convertScoreReadToInsert-- Convert read types to insert types (date strings to timestamps).convertPostgresTraceToInsert,convertPostgresObservationToInsert,convertPostgresScoreToInsert,convertPostgresDatasetRunItemToInsert-- Convert raw PostgreSQL query results to ClickHouse insert types for data migration/sync.convertTraceToStagingObservation-- Converts a trace into a synthetic SPAN observation for the batch staging pipeline.
Shared utilities include clickhouseStringDateSchema (transforms ClickHouse datetime strings to ISO 8601) and UsageCostSchema (parses string-typed int64 values to numbers).
Usage
Use these schemas to validate data flowing into and out of ClickHouse. Import the read types when defining return types for repository query functions. Import the insert types when constructing records for insertion. Use the converter functions during data migration or when bridging between PostgreSQL and ClickHouse.
Code Reference
Source Location
- Repository: Langfuse
- File: packages/shared/src/server/repositories/definitions.ts
- Lines: 1-748
Signature
// Schema exports
export const observationRecordReadSchema: z.ZodObject<...>;
export const observationRecordInsertSchema: z.ZodObject<...>;
export const traceRecordReadSchema: z.ZodObject<...>;
export const traceRecordInsertSchema: z.ZodObject<...>;
export const traceNullRecordInsertSchema: z.ZodObject<...>;
export const scoreRecordReadSchema: z.ZodObject<...>;
export const scoreRecordInsertSchema: z.ZodObject<...>;
export const eventRecordReadSchema: z.ZodObject<...>;
export const eventRecordInsertSchema: z.ZodObject<...>;
export const datasetRunItemRecordInsertSchema: z.ZodObject<...>;
export const blobStorageFileLogRecordInsertSchema: z.ZodObject<...>;
// Type exports
export type ObservationRecordReadType = z.infer<typeof observationRecordReadSchema>;
export type ObservationRecordInsertType = z.infer<typeof observationRecordInsertSchema>;
export type TraceRecordReadType = z.infer<typeof traceRecordReadSchema>;
export type TraceRecordInsertType = z.infer<typeof traceRecordInsertSchema>;
export type ScoreRecordReadType = z.infer<typeof scoreRecordReadSchema>;
export type ScoreRecordInsertType = z.infer<typeof scoreRecordInsertSchema>;
export type EventRecordReadType = z.infer<typeof eventRecordReadSchema>;
export type EventRecordInsertType = z.infer<typeof eventRecordInsertSchema>;
export type DatasetRunItemRecord<WithIO extends boolean = true> = ...;
// Converter exports
export const convertTraceReadToInsert: (record: TraceRecordReadType) => TraceRecordInsertType;
export const convertObservationReadToInsert: (record: ObservationRecordReadType) => ObservationRecordInsertType;
export const convertScoreReadToInsert: (record: ScoreRecordReadType) => ScoreRecordInsertType;
export const convertPostgresTraceToInsert: (trace: Record<string, any>) => TraceRecordInsertType;
export const convertPostgresObservationToInsert: (obs: Record<string, any>) => ObservationRecordInsertType;
export const convertPostgresScoreToInsert: (score: Record<string, any>) => ScoreRecordInsertType;
export const convertPostgresDatasetRunItemToInsert: (item: Record<string, any>) => DatasetRunItemRecordInsertType;
export const convertTraceToStagingObservation: (trace: TraceRecordInsertType, s3Timestamp: number) => ObservationBatchStagingRecordInsertType;
Import
import {
ObservationRecordReadType,
TraceRecordReadType,
ScoreRecordReadType,
EventRecordReadType,
observationRecordReadSchema,
traceRecordInsertSchema,
convertTraceReadToInsert,
convertPostgresObservationToInsert,
} from "@langfuse/shared/src/server/repositories/definitions";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ClickHouse read records | Various raw query rows | Yes (for read schemas) | Raw JSON rows from ClickHouse queries with string dates and string int64s |
| PostgreSQL records | Record<string, any> | Yes (for Postgres converters) | Raw SQL query rows with native JS Date objects and Decimal types |
| TraceRecordInsertType | TraceRecordInsertType | Yes (for staging converter) | Trace insert record to convert to synthetic observation |
| s3FirstSeenTimestamp | number | Yes (for staging converter) | Epoch timestamp of first S3 appearance |
Outputs
| Name | Type | Description |
|---|---|---|
| Read types | *RecordReadType | Validated records with ISO date strings and numeric usage/cost maps |
| Insert types | *RecordInsertType | Records with epoch millisecond timestamps ready for ClickHouse insertion |
| UsageCostType | Record<string, number> | Parsed numeric map from ClickHouse's string-typed int64 values |
Usage Examples
import {
observationRecordReadSchema,
ObservationRecordReadType,
convertPostgresTraceToInsert,
} from "@langfuse/shared/src/server/repositories/definitions";
// Parse a ClickHouse query result
const rawRow = { id: "obs_1", start_time: "2024-01-15 10:30:00.000000", ... };
const validated = observationRecordReadSchema.parse(rawRow);
// validated.start_time is now "2024-01-15T10:30:00.000000Z"
// Convert a PostgreSQL trace for ClickHouse insertion
const pgTrace = await prisma.$queryRaw`SELECT * FROM traces WHERE id = 'trace_1'`;
const chInsert = convertPostgresTraceToInsert(pgTrace[0]);
// chInsert.timestamp is now a number (epoch ms)