Implementation:Langfuse Langfuse Observations Repository
| Knowledge Sources | |
|---|---|
| Domains | Tracing, Observations, ClickHouse |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Repository for querying and upserting observations (spans, generations, events) in ClickHouse, providing filtered table views, single-record lookups, streaming exports, and cost analytics for the Langfuse tracing system.
Description
This module is the primary data access layer for observations in the ClickHouse observations table. Observations represent individual spans, generations, or events within a trace, containing timing data, model usage, costs, input/output, and prompt references.
Key function groups:
Existence and lookup:
checkObservationExists-- Lightweight existence check with time-based partition pruning using a lookback window relative tostartTime.getObservationById-- Single observation retrieval with model data enrichment from PostgreSQL.getObservationForTraceIdByName-- Lookup by trace ID and observation name, used for eval target resolution.
Trace-scoped queries:
getObservationsForTrace-- All observations for a trace, with configurable I/O inclusion and payload size limiting (5MB cap to respect NextJS response limits). Supports OTel projects that skip deduplication (no FINAL/LIMIT BY).
Table queries:
getObservationsTable-- Paginated, filtered, ordered observation list with trace data enrichment (tags, name, user ID) and model pricing. Supports full-text search across name and I/O fields.getObservationsTableCount-- Matching count for pagination.
Upserting:
upsertObservation-- Inserts/updates a single observation via theupsertClickhousefunction, writing to both ClickHouse and S3 blob storage.
Streaming and export:
getObservationsForPostHog-- Async generator streaming observations for PostHog analytics integration with latency, cost, and session metadata.getObservationsForExport-- Streams observations in export format with model enrichment.
Analytics:
getObservationCountsByProjectAndDay-- Usage counts grouped by project and day (without FINAL for performance, accepts over-counting).getCostByEvaluatorIds-- Total cost per evaluator for the last 7 days, used for evaluator cost tracking.
The module uses the ClickHouse filter framework with observationsTableUiColumnDefinitions and supports time-bounded queries via OBSERVATIONS_TO_TRACE_INTERVAL and TRACE_TO_OBSERVATIONS_INTERVAL constants for efficient partition pruning.
Usage
Use this repository from tRPC routes serving the observations/generations table UI, trace detail views, public API observation endpoints, and background workers performing analytics exports or evaluations.
Code Reference
Source Location
- Repository: Langfuse
- File: packages/shared/src/server/repositories/observations.ts
- Lines: 1-2030
Signature
export const checkObservationExists = async (
projectId: string,
id: string,
startTime: Date | undefined,
): Promise<boolean>;
export const upsertObservation = async (
observation: Partial<ObservationRecordReadType>,
): Promise<void>;
export const getObservationsForTrace = async <IncludeIO extends boolean>(
opts: GetObservationsForTraceOpts<IncludeIO>,
) => Promise<Observation[]>;
export const getObservationForTraceIdByName = async (opts: {
traceId: string;
projectId: string;
name: string;
timestamp?: Date;
fetchWithInputOutput?: boolean;
}) => Promise<Observation | null>;
export const getObservationById = async (opts: {
projectId: string;
observationId: string;
startTime?: Date;
traceId?: string;
fetchWithInputOutput?: boolean;
}) => Promise<(Observation & ObservationPriceFields) | null>;
export type ObservationTableQuery = {
projectId: string;
filter: FilterState;
orderBy?: OrderByState;
limit?: number;
offset?: number;
searchQuery?: string;
selectIOAndMetadata?: boolean;
};
export const getObservationsTable = async (
opts: ObservationTableQuery,
) => Promise<FullObservations>;
export const getObservationsTableCount = async (
opts: ObservationTableQuery,
) => Promise<number>;
export const getObservationCountsByProjectAndDay = async (opts: {
startDate: Date;
endDate: Date;
}) => Promise<Array<{ count: number; projectId: string; date: string }>>;
export const getCostByEvaluatorIds = async (
projectId: string,
evaluatorIds: string[],
) => Promise<Array<{ evaluatorId: string; totalCost: number }>>;
Import
import {
checkObservationExists,
upsertObservation,
getObservationsForTrace,
getObservationById,
getObservationsTable,
getObservationsTableCount,
getObservationCountsByProjectAndDay,
getCostByEvaluatorIds,
} from "@langfuse/shared/src/server/repositories/observations";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| projectId | string | Yes | Project scope for all queries |
| observationId / id | string | Yes (for lookups) | Observation ID to retrieve |
| traceId | string | Yes (for trace-scoped) | Trace ID to query observations for |
| startTime / timestamp | Date | No | Time hint for ClickHouse partition pruning |
| filter | FilterState | Yes (for table queries) | Filter conditions array |
| orderBy | OrderByState | No | Sort column and direction |
| limit / offset | number | No | Pagination parameters |
| searchQuery | string | No | Full-text search across name, input, output |
| includeIO | boolean | No | Whether to fetch large input/output/metadata fields |
| observation | Partial<ObservationRecordReadType> | Yes (for upsert) | Observation data to insert/update |
| evaluatorIds | string[] | Yes (for cost query) | Evaluator IDs to aggregate costs for |
Outputs
| Name | Type | Description |
|---|---|---|
| Observation | Observation | Complete observation domain object with all fields |
| Observation & ObservationPriceFields | Object | Observation enriched with model pricing (inputPrice, outputPrice, totalPrice) |
| FullObservations | Array | Observations with trace data (tags, name, userId) and pricing |
| boolean | boolean | Existence check result |
| count | number | Total matching observations for pagination |
| cost aggregates | Array<{ evaluatorId, totalCost }> | Per-evaluator cost totals |
Usage Examples
import {
getObservationsForTrace,
getObservationById,
checkObservationExists,
} from "@langfuse/shared/src/server/repositories/observations";
// Check if observation exists
const exists = await checkObservationExists("proj_123", "obs_456", new Date());
// Get all observations for a trace
const observations = await getObservationsForTrace({
traceId: "trace_789",
projectId: "proj_123",
includeIO: true,
});
// Get single observation with pricing
const obs = await getObservationById({
projectId: "proj_123",
observationId: "obs_456",
fetchWithInputOutput: true,
});