Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse Observations Repository

From Leeroopedia
Knowledge Sources
Domains Tracing, Observations, ClickHouse
Last Updated 2026-02-14 00:00 GMT

Overview

Repository for querying and upserting observations (spans, generations, events) in ClickHouse, providing filtered table views, single-record lookups, streaming exports, and cost analytics for the Langfuse tracing system.

Description

This module is the primary data access layer for observations in the ClickHouse observations table. Observations represent individual spans, generations, or events within a trace, containing timing data, model usage, costs, input/output, and prompt references.

Key function groups:

Existence and lookup:

  • checkObservationExists -- Lightweight existence check with time-based partition pruning using a lookback window relative to startTime.
  • getObservationById -- Single observation retrieval with model data enrichment from PostgreSQL.
  • getObservationForTraceIdByName -- Lookup by trace ID and observation name, used for eval target resolution.

Trace-scoped queries:

  • getObservationsForTrace -- All observations for a trace, with configurable I/O inclusion and payload size limiting (5MB cap to respect NextJS response limits). Supports OTel projects that skip deduplication (no FINAL/LIMIT BY).

Table queries:

  • getObservationsTable -- Paginated, filtered, ordered observation list with trace data enrichment (tags, name, user ID) and model pricing. Supports full-text search across name and I/O fields.
  • getObservationsTableCount -- Matching count for pagination.

Upserting:

  • upsertObservation -- Inserts/updates a single observation via the upsertClickhouse function, writing to both ClickHouse and S3 blob storage.

Streaming and export:

  • getObservationsForPostHog -- Async generator streaming observations for PostHog analytics integration with latency, cost, and session metadata.
  • getObservationsForExport -- Streams observations in export format with model enrichment.

Analytics:

  • getObservationCountsByProjectAndDay -- Usage counts grouped by project and day (without FINAL for performance, accepts over-counting).
  • getCostByEvaluatorIds -- Total cost per evaluator for the last 7 days, used for evaluator cost tracking.

The module uses the ClickHouse filter framework with observationsTableUiColumnDefinitions and supports time-bounded queries via OBSERVATIONS_TO_TRACE_INTERVAL and TRACE_TO_OBSERVATIONS_INTERVAL constants for efficient partition pruning.

Usage

Use this repository from tRPC routes serving the observations/generations table UI, trace detail views, public API observation endpoints, and background workers performing analytics exports or evaluations.

Code Reference

Source Location

Signature

export const checkObservationExists = async (
  projectId: string,
  id: string,
  startTime: Date | undefined,
): Promise<boolean>;

export const upsertObservation = async (
  observation: Partial<ObservationRecordReadType>,
): Promise<void>;

export const getObservationsForTrace = async <IncludeIO extends boolean>(
  opts: GetObservationsForTraceOpts<IncludeIO>,
) => Promise<Observation[]>;

export const getObservationForTraceIdByName = async (opts: {
  traceId: string;
  projectId: string;
  name: string;
  timestamp?: Date;
  fetchWithInputOutput?: boolean;
}) => Promise<Observation | null>;

export const getObservationById = async (opts: {
  projectId: string;
  observationId: string;
  startTime?: Date;
  traceId?: string;
  fetchWithInputOutput?: boolean;
}) => Promise<(Observation & ObservationPriceFields) | null>;

export type ObservationTableQuery = {
  projectId: string;
  filter: FilterState;
  orderBy?: OrderByState;
  limit?: number;
  offset?: number;
  searchQuery?: string;
  selectIOAndMetadata?: boolean;
};

export const getObservationsTable = async (
  opts: ObservationTableQuery,
) => Promise<FullObservations>;

export const getObservationsTableCount = async (
  opts: ObservationTableQuery,
) => Promise<number>;

export const getObservationCountsByProjectAndDay = async (opts: {
  startDate: Date;
  endDate: Date;
}) => Promise<Array<{ count: number; projectId: string; date: string }>>;

export const getCostByEvaluatorIds = async (
  projectId: string,
  evaluatorIds: string[],
) => Promise<Array<{ evaluatorId: string; totalCost: number }>>;

Import

import {
  checkObservationExists,
  upsertObservation,
  getObservationsForTrace,
  getObservationById,
  getObservationsTable,
  getObservationsTableCount,
  getObservationCountsByProjectAndDay,
  getCostByEvaluatorIds,
} from "@langfuse/shared/src/server/repositories/observations";

I/O Contract

Inputs

Name Type Required Description
projectId string Yes Project scope for all queries
observationId / id string Yes (for lookups) Observation ID to retrieve
traceId string Yes (for trace-scoped) Trace ID to query observations for
startTime / timestamp Date No Time hint for ClickHouse partition pruning
filter FilterState Yes (for table queries) Filter conditions array
orderBy OrderByState No Sort column and direction
limit / offset number No Pagination parameters
searchQuery string No Full-text search across name, input, output
includeIO boolean No Whether to fetch large input/output/metadata fields
observation Partial<ObservationRecordReadType> Yes (for upsert) Observation data to insert/update
evaluatorIds string[] Yes (for cost query) Evaluator IDs to aggregate costs for

Outputs

Name Type Description
Observation Observation Complete observation domain object with all fields
Observation & ObservationPriceFields Object Observation enriched with model pricing (inputPrice, outputPrice, totalPrice)
FullObservations Array Observations with trace data (tags, name, userId) and pricing
boolean boolean Existence check result
count number Total matching observations for pagination
cost aggregates Array<{ evaluatorId, totalCost }> Per-evaluator cost totals

Usage Examples

import {
  getObservationsForTrace,
  getObservationById,
  checkObservationExists,
} from "@langfuse/shared/src/server/repositories/observations";

// Check if observation exists
const exists = await checkObservationExists("proj_123", "obs_456", new Date());

// Get all observations for a trace
const observations = await getObservationsForTrace({
  traceId: "trace_789",
  projectId: "proj_123",
  includeIO: true,
});

// Get single observation with pricing
const obs = await getObservationById({
  projectId: "proj_123",
  observationId: "obs_456",
  fetchWithInputOutput: true,
});

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment