Implementation:Langfuse Langfuse Ingestion Event Schemas
| Knowledge Sources | |
|---|---|
| Domains | Data Validation, Trace Ingestion |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for defining, validating, and normalizing all ingestion event types via Zod v4 schemas provided by Langfuse.
Description
This module defines the comprehensive set of Zod v4 schemas that validate every event flowing through the Langfuse trace ingestion pipeline. It serves as the single source of truth for the data contract between SDKs and the server.
The module is organized around a factory function createAllIngestionSchemas that generates all schemas with a configurable isPublic flag. When isPublic is true, the environment name field rejects values starting with "langfuse" (reserved for internal use). When false, this restriction is relaxed.
Key components:
eventTypesenum: Maps 18 event type names to their string identifiers (e.g.,TRACE_CREATE -> "trace-create").- Body schemas:
TraceBody,OptionalObservationBody,CreateEventEvent,CreateSpanBody,UpdateSpanBody,CreateGenerationBody,UpdateGenerationBody,ScoreBody,SdkLogEvent,DatasetRunItemBody. - Usage normalization:
Usage,MixedUsage,UsageDetailsschemas that transform heterogeneous token count formats into a normalized structure. ingestionEventdiscriminated union: The top-level schema that selects the correct body schema based on thetypefield.createIngestionEventSchemafactory: Returns either the public or internal variant of the discriminated union.
Usage
Import these schemas and types when:
- Parsing incoming ingestion events in the
processEventBatchfunction. - Type-checking event handlers in the worker.
- Writing tests that construct valid or invalid ingestion events.
- Extending the schema with new event types or new fields on existing types.
Code Reference
Source Location
- Repository: langfuse
- File:
packages/shared/src/server/ingestion/types.ts - Lines: L1-823
Signature
// Event type enumeration
export const eventTypes = {
TRACE_CREATE: "trace-create",
SCORE_CREATE: "score-create",
EVENT_CREATE: "event-create",
SPAN_CREATE: "span-create",
SPAN_UPDATE: "span-update",
GENERATION_CREATE: "generation-create",
GENERATION_UPDATE: "generation-update",
AGENT_CREATE: "agent-create",
TOOL_CREATE: "tool-create",
CHAIN_CREATE: "chain-create",
RETRIEVER_CREATE: "retriever-create",
EVALUATOR_CREATE: "evaluator-create",
EMBEDDING_CREATE: "embedding-create",
GUARDRAIL_CREATE: "guardrail-create",
SDK_LOG: "sdk-log",
DATASET_RUN_ITEM_CREATE: "dataset-run-item-create",
OBSERVATION_CREATE: "observation-create", // Legacy
OBSERVATION_UPDATE: "observation-update", // Legacy
} as const;
// Factory function
export const createIngestionEventSchema = (
isLangfuseInternal?: boolean
) => z.ZodDiscriminatedUnion<...>;
// Exported types
export type IngestionEventType = z.infer<typeof ingestionEvent>;
export type TraceEventType = z.infer<typeof traceEvent>;
export type ScoreEventType = z.infer<typeof scoreEvent>;
export type ObservationEvent = z.infer<typeof eventCreateEvent>
| z.infer<typeof spanCreateEvent>
| z.infer<typeof spanUpdateEvent>
| z.infer<typeof generationCreateEvent>
| z.infer<typeof generationUpdateEvent>
| z.infer<typeof agentCreateEvent>
| ... ; // and other observation subtypes
Import
import {
eventTypes,
ingestionEvent,
createIngestionEventSchema,
TraceBody,
CreateGenerationBody,
ScoreBody,
Usage,
UsageDetails,
type IngestionEventType,
type ObservationEvent,
} from "@langfuse/shared/src/server/ingestion/types";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| (raw event) | unknown |
Yes | Raw JSON object from the SDK batch payload. Must have a type field matching one of the eventTypes values.
|
| isLangfuseInternal | boolean |
No | When true, uses internal environment validation (allows "langfuse" prefix). Defaults to false.
|
Outputs
| Name | Type | Description |
|---|---|---|
| IngestionEventType | z.infer<typeof ingestionEvent> |
Union type of all validated event shapes. Each variant has id, timestamp, type, and a type-specific body.
|
| body.id | string |
The entity ID (trace ID, observation ID, or score ID) within the event body. |
| body.environment | string |
The environment name, lowercased. Defaults to "default" if not provided.
|
| body.usage | Usage (nullish) |
Normalized usage with input, output, total, unit fields. Only present on generation events.
|
| body.usageDetails | Record<string, number> (nullish) |
Detailed per-category usage breakdown (e.g., input_cached_tokens). Only present on generation events.
|
Usage Examples
Validating a Single Event
import { ingestionEvent } from "@langfuse/shared/src/server/ingestion/types";
const rawEvent = {
id: "evt_abc123",
timestamp: "2024-01-15T10:30:00.000Z",
type: "trace-create",
body: {
id: "trace_xyz",
name: "My Trace",
environment: "production",
tags: ["v2", "chat"],
},
};
const result = ingestionEvent.safeParse(rawEvent);
if (result.success) {
// result.data is typed as IngestionEventType
console.log(result.data.body.id); // "trace_xyz"
} else {
console.error(result.error.message);
}
Using the Factory for Internal Events
import { createIngestionEventSchema } from "@langfuse/shared/src/server/ingestion/types";
// Internal schema allows "langfuse-experiments" as environment name
const internalSchema = createIngestionEventSchema(true);
const event = internalSchema.parse({
id: "evt_001",
timestamp: "2024-01-15T10:30:00.000Z",
type: "trace-create",
body: {
id: "trace_exp_001",
name: "Prompt Experiment Run",
environment: "langfuse-experiments",
},
});
Usage Normalization from OpenAI Format
import { UsageDetails } from "@langfuse/shared/src/server/ingestion/types";
// OpenAI Completion API format
const openAiUsage = {
prompt_tokens: 150,
completion_tokens: 50,
total_tokens: 200,
prompt_tokens_details: { cached_tokens: 100 },
completion_tokens_details: { reasoning_tokens: 20 },
};
const parsed = UsageDetails.parse(openAiUsage);
// Result: { input: 50, output: 30, total: 200, input_cached_tokens: 100, output_reasoning_tokens: 20 }
// Note: input = 150 - 100 = 50 (prompt_tokens minus cached_tokens)
// Note: output = 50 - 20 = 30 (completion_tokens minus reasoning_tokens)
Score Body with Discriminated Union
import { ScoreBody } from "@langfuse/shared/src/server/ingestion/types";
// Numeric score
const numericScore = ScoreBody.parse({
name: "accuracy",
traceId: "trace_001",
value: 0.95,
dataType: "NUMERIC",
source: "EVAL",
});
// Boolean score
const booleanScore = ScoreBody.parse({
name: "is_relevant",
traceId: "trace_001",
value: 1,
dataType: "BOOLEAN",
source: "ANNOTATION",
});