Implementation:Langfuse Langfuse Server Instrumentation
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Observability, Instrumentation |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Provides OpenTelemetry tracing wrappers, Datadog StatsD metric recording, and AWS CloudWatch metric publishing for server-side instrumentation across the Langfuse platform.
Description
This module is the central instrumentation layer for the Langfuse server applications (web and worker). It provides:
- OpenTelemetry tracing:
instrumentAsyncandinstrumentSyncwrap async and sync functions respectively in OpenTelemetry spans, propagating trace context and baggage. They handle span lifecycle (start, end, error recording) and support root spans, custom trace scopes, and trace context propagation via carrier objects.
- Span utilities:
getCurrentSpanretrieves the active span,traceExceptionrecords exceptions on spans (with both OTel event recording and Datadog error tracking attributes), andaddUserToSpanattaches user/project/org context to spans and baggage for downstream propagation.
- Metrics recording:
recordGauge,recordIncrement,recordHistogram, andrecordDistributionproxy to Datadog StatsD. WhenENABLE_AWS_CLOUDWATCH_METRIC_PUBLISHINGis enabled, gauge and increment metrics are also batched and flushed to AWS CloudWatch every 30 seconds via a local cache.
- Queue metric naming:
convertQueueNameToMetricNameconverts BullMQ queue names to Datadog-compatible metric names (e.g.,legacy-ingestion-queuebecomeslangfuse.queue.legacy_ingestion).
Usage
Use instrumentAsync/instrumentSync to wrap any function that should be traced. Use the metric recording functions to emit operational metrics. Use addUserToSpan in authentication middleware to propagate user context through the trace.
Code Reference
Source Location
- Repository: Langfuse
- File: packages/shared/src/server/instrumentation/index.ts
- Lines: 1-318
Signature
export type TCarrier = {
traceparent?: string;
tracestate?: string;
};
export type SpanCtx = {
name: string;
spanKind?: opentelemetry.SpanKind;
rootSpan?: boolean;
traceScope?: string;
traceContext?: TCarrier;
startNewTrace?: boolean;
};
export async function instrumentAsync<T>(
ctx: SpanCtx,
callback: (span: opentelemetry.Span) => Promise<T>,
): Promise<T>;
export function instrumentSync<T>(
ctx: SpanCtx,
callback: (span: opentelemetry.Span) => T,
): T;
export const getCurrentSpan: () => opentelemetry.Span | undefined;
export const traceException: (
ex: unknown,
span?: opentelemetry.Span,
code?: string,
) => void;
export const addUserToSpan: (
attributes: {
userId?: string;
projectId?: string;
email?: string;
orgId?: string;
plan?: string;
},
span?: opentelemetry.Span,
) => opentelemetry.Context | undefined;
export const getTracer: (name: string) => opentelemetry.Tracer;
export const recordGauge: (
stat: string,
value?: number,
tags?: Record<string, string | number>,
) => void;
export const recordIncrement: (
stat: string,
value?: number,
tags?: Record<string, string | number>,
) => void;
export const recordHistogram: (
stat: string,
value?: number,
tags?: Record<string, string | number>,
) => void;
export const recordDistribution: (
stat: string,
value?: number,
tags?: Record<string, string | number>,
) => void;
export const convertQueueNameToMetricName: (queueName: string) => string;
Import
import {
instrumentAsync,
instrumentSync,
traceException,
addUserToSpan,
recordGauge,
recordIncrement,
recordHistogram,
convertQueueNameToMetricName,
} from "@langfuse/shared/src/server";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ctx | SpanCtx | Yes | Span configuration including name, kind, root flag, trace scope, and optional trace context carrier |
| callback | (span: Span) => T or Promise<T> | Yes | The function to execute within the span. Receives the active span for attribute setting. |
| stat | string | Yes | Metric name for StatsD/CloudWatch recording |
| value | number | No | Metric value (defaults to 0 for gauge, 1 for increment) |
| tags | Record<string, string or number> | No | Key-value tags for metric dimensioning |
Outputs
| Name | Type | Description |
|---|---|---|
| T | generic | The return value of the wrapped callback function |
| void | void | Metric recording functions return void |
| string | string | convertQueueNameToMetricName returns the Datadog-formatted metric name |
Usage Examples
import { instrumentAsync, addUserToSpan, recordIncrement } from "@langfuse/shared/src/server";
// Wrap an async operation in a traced span
const result = await instrumentAsync(
{ name: "processIngestion", spanKind: SpanKind.INTERNAL },
async (span) => {
span.setAttribute("event.count", events.length);
return await processEvents(events);
},
);
// Add user context to current span in auth middleware
addUserToSpan({
userId: session.user.id,
projectId: currentProject.id,
orgId: org.id,
plan: org.plan,
});
// Record a metric
recordIncrement("langfuse.ingestion.events_processed", events.length, {
projectId: project.id,
});
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment