Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse Ingestion Event Schemas

From Leeroopedia
Knowledge Sources
Domains Data Validation, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for defining, validating, and normalizing all ingestion event types via Zod v4 schemas provided by Langfuse.

Description

This module defines the comprehensive set of Zod v4 schemas that validate every event flowing through the Langfuse trace ingestion pipeline. It serves as the single source of truth for the data contract between SDKs and the server.

The module is organized around a factory function createAllIngestionSchemas that generates all schemas with a configurable isPublic flag. When isPublic is true, the environment name field rejects values starting with "langfuse" (reserved for internal use). When false, this restriction is relaxed.

Key components:

  • eventTypes enum: Maps 18 event type names to their string identifiers (e.g., TRACE_CREATE -> "trace-create").
  • Body schemas: TraceBody, OptionalObservationBody, CreateEventEvent, CreateSpanBody, UpdateSpanBody, CreateGenerationBody, UpdateGenerationBody, ScoreBody, SdkLogEvent, DatasetRunItemBody.
  • Usage normalization: Usage, MixedUsage, UsageDetails schemas that transform heterogeneous token count formats into a normalized structure.
  • ingestionEvent discriminated union: The top-level schema that selects the correct body schema based on the type field.
  • createIngestionEventSchema factory: Returns either the public or internal variant of the discriminated union.

Usage

Import these schemas and types when:

  • Parsing incoming ingestion events in the processEventBatch function.
  • Type-checking event handlers in the worker.
  • Writing tests that construct valid or invalid ingestion events.
  • Extending the schema with new event types or new fields on existing types.

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/ingestion/types.ts
  • Lines: L1-823

Signature

// Event type enumeration
export const eventTypes = {
  TRACE_CREATE: "trace-create",
  SCORE_CREATE: "score-create",
  EVENT_CREATE: "event-create",
  SPAN_CREATE: "span-create",
  SPAN_UPDATE: "span-update",
  GENERATION_CREATE: "generation-create",
  GENERATION_UPDATE: "generation-update",
  AGENT_CREATE: "agent-create",
  TOOL_CREATE: "tool-create",
  CHAIN_CREATE: "chain-create",
  RETRIEVER_CREATE: "retriever-create",
  EVALUATOR_CREATE: "evaluator-create",
  EMBEDDING_CREATE: "embedding-create",
  GUARDRAIL_CREATE: "guardrail-create",
  SDK_LOG: "sdk-log",
  DATASET_RUN_ITEM_CREATE: "dataset-run-item-create",
  OBSERVATION_CREATE: "observation-create",  // Legacy
  OBSERVATION_UPDATE: "observation-update",  // Legacy
} as const;

// Factory function
export const createIngestionEventSchema = (
  isLangfuseInternal?: boolean
) => z.ZodDiscriminatedUnion<...>;

// Exported types
export type IngestionEventType = z.infer<typeof ingestionEvent>;
export type TraceEventType = z.infer<typeof traceEvent>;
export type ScoreEventType = z.infer<typeof scoreEvent>;
export type ObservationEvent = z.infer<typeof eventCreateEvent>
  | z.infer<typeof spanCreateEvent>
  | z.infer<typeof spanUpdateEvent>
  | z.infer<typeof generationCreateEvent>
  | z.infer<typeof generationUpdateEvent>
  | z.infer<typeof agentCreateEvent>
  | ... ; // and other observation subtypes

Import

import {
  eventTypes,
  ingestionEvent,
  createIngestionEventSchema,
  TraceBody,
  CreateGenerationBody,
  ScoreBody,
  Usage,
  UsageDetails,
  type IngestionEventType,
  type ObservationEvent,
} from "@langfuse/shared/src/server/ingestion/types";

I/O Contract

Inputs

Name Type Required Description
(raw event) unknown Yes Raw JSON object from the SDK batch payload. Must have a type field matching one of the eventTypes values.
isLangfuseInternal boolean No When true, uses internal environment validation (allows "langfuse" prefix). Defaults to false.

Outputs

Name Type Description
IngestionEventType z.infer<typeof ingestionEvent> Union type of all validated event shapes. Each variant has id, timestamp, type, and a type-specific body.
body.id string The entity ID (trace ID, observation ID, or score ID) within the event body.
body.environment string The environment name, lowercased. Defaults to "default" if not provided.
body.usage Usage (nullish) Normalized usage with input, output, total, unit fields. Only present on generation events.
body.usageDetails Record<string, number> (nullish) Detailed per-category usage breakdown (e.g., input_cached_tokens). Only present on generation events.

Usage Examples

Validating a Single Event

import { ingestionEvent } from "@langfuse/shared/src/server/ingestion/types";

const rawEvent = {
  id: "evt_abc123",
  timestamp: "2024-01-15T10:30:00.000Z",
  type: "trace-create",
  body: {
    id: "trace_xyz",
    name: "My Trace",
    environment: "production",
    tags: ["v2", "chat"],
  },
};

const result = ingestionEvent.safeParse(rawEvent);
if (result.success) {
  // result.data is typed as IngestionEventType
  console.log(result.data.body.id); // "trace_xyz"
} else {
  console.error(result.error.message);
}

Using the Factory for Internal Events

import { createIngestionEventSchema } from "@langfuse/shared/src/server/ingestion/types";

// Internal schema allows "langfuse-experiments" as environment name
const internalSchema = createIngestionEventSchema(true);
const event = internalSchema.parse({
  id: "evt_001",
  timestamp: "2024-01-15T10:30:00.000Z",
  type: "trace-create",
  body: {
    id: "trace_exp_001",
    name: "Prompt Experiment Run",
    environment: "langfuse-experiments",
  },
});

Usage Normalization from OpenAI Format

import { UsageDetails } from "@langfuse/shared/src/server/ingestion/types";

// OpenAI Completion API format
const openAiUsage = {
  prompt_tokens: 150,
  completion_tokens: 50,
  total_tokens: 200,
  prompt_tokens_details: { cached_tokens: 100 },
  completion_tokens_details: { reasoning_tokens: 20 },
};

const parsed = UsageDetails.parse(openAiUsage);
// Result: { input: 50, output: 30, total: 200, input_cached_tokens: 100, output_reasoning_tokens: 20 }
// Note: input = 150 - 100 = 50 (prompt_tokens minus cached_tokens)
// Note: output = 50 - 20 = 30 (completion_tokens minus reasoning_tokens)

Score Body with Discriminated Union

import { ScoreBody } from "@langfuse/shared/src/server/ingestion/types";

// Numeric score
const numericScore = ScoreBody.parse({
  name: "accuracy",
  traceId: "trace_001",
  value: 0.95,
  dataType: "NUMERIC",
  source: "EVAL",
});

// Boolean score
const booleanScore = ScoreBody.parse({
  name: "is_relevant",
  traceId: "trace_001",
  value: 1,
  dataType: "BOOLEAN",
  source: "ANNOTATION",
});

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment