Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse Server Instrumentation

From Leeroopedia
Revision as of 13:14, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Langfuse_Langfuse_Server_Instrumentation.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Observability, Instrumentation
Last Updated 2026-02-14 00:00 GMT

Overview

Provides OpenTelemetry tracing wrappers, Datadog StatsD metric recording, and AWS CloudWatch metric publishing for server-side instrumentation across the Langfuse platform.

Description

This module is the central instrumentation layer for the Langfuse server applications (web and worker). It provides:

  1. OpenTelemetry tracing: instrumentAsync and instrumentSync wrap async and sync functions respectively in OpenTelemetry spans, propagating trace context and baggage. They handle span lifecycle (start, end, error recording) and support root spans, custom trace scopes, and trace context propagation via carrier objects.
  1. Span utilities: getCurrentSpan retrieves the active span, traceException records exceptions on spans (with both OTel event recording and Datadog error tracking attributes), and addUserToSpan attaches user/project/org context to spans and baggage for downstream propagation.
  1. Metrics recording: recordGauge, recordIncrement, recordHistogram, and recordDistribution proxy to Datadog StatsD. When ENABLE_AWS_CLOUDWATCH_METRIC_PUBLISHING is enabled, gauge and increment metrics are also batched and flushed to AWS CloudWatch every 30 seconds via a local cache.
  1. Queue metric naming: convertQueueNameToMetricName converts BullMQ queue names to Datadog-compatible metric names (e.g., legacy-ingestion-queue becomes langfuse.queue.legacy_ingestion).

Usage

Use instrumentAsync/instrumentSync to wrap any function that should be traced. Use the metric recording functions to emit operational metrics. Use addUserToSpan in authentication middleware to propagate user context through the trace.

Code Reference

Source Location

Signature

export type TCarrier = {
  traceparent?: string;
  tracestate?: string;
};

export type SpanCtx = {
  name: string;
  spanKind?: opentelemetry.SpanKind;
  rootSpan?: boolean;
  traceScope?: string;
  traceContext?: TCarrier;
  startNewTrace?: boolean;
};

export async function instrumentAsync<T>(
  ctx: SpanCtx,
  callback: (span: opentelemetry.Span) => Promise<T>,
): Promise<T>;

export function instrumentSync<T>(
  ctx: SpanCtx,
  callback: (span: opentelemetry.Span) => T,
): T;

export const getCurrentSpan: () => opentelemetry.Span | undefined;

export const traceException: (
  ex: unknown,
  span?: opentelemetry.Span,
  code?: string,
) => void;

export const addUserToSpan: (
  attributes: {
    userId?: string;
    projectId?: string;
    email?: string;
    orgId?: string;
    plan?: string;
  },
  span?: opentelemetry.Span,
) => opentelemetry.Context | undefined;

export const getTracer: (name: string) => opentelemetry.Tracer;

export const recordGauge: (
  stat: string,
  value?: number,
  tags?: Record<string, string | number>,
) => void;

export const recordIncrement: (
  stat: string,
  value?: number,
  tags?: Record<string, string | number>,
) => void;

export const recordHistogram: (
  stat: string,
  value?: number,
  tags?: Record<string, string | number>,
) => void;

export const recordDistribution: (
  stat: string,
  value?: number,
  tags?: Record<string, string | number>,
) => void;

export const convertQueueNameToMetricName: (queueName: string) => string;

Import

import {
  instrumentAsync,
  instrumentSync,
  traceException,
  addUserToSpan,
  recordGauge,
  recordIncrement,
  recordHistogram,
  convertQueueNameToMetricName,
} from "@langfuse/shared/src/server";

I/O Contract

Inputs

Name Type Required Description
ctx SpanCtx Yes Span configuration including name, kind, root flag, trace scope, and optional trace context carrier
callback (span: Span) => T or Promise<T> Yes The function to execute within the span. Receives the active span for attribute setting.
stat string Yes Metric name for StatsD/CloudWatch recording
value number No Metric value (defaults to 0 for gauge, 1 for increment)
tags Record<string, string or number> No Key-value tags for metric dimensioning

Outputs

Name Type Description
T generic The return value of the wrapped callback function
void void Metric recording functions return void
string string convertQueueNameToMetricName returns the Datadog-formatted metric name

Usage Examples

import { instrumentAsync, addUserToSpan, recordIncrement } from "@langfuse/shared/src/server";

// Wrap an async operation in a traced span
const result = await instrumentAsync(
  { name: "processIngestion", spanKind: SpanKind.INTERNAL },
  async (span) => {
    span.setAttribute("event.count", events.length);
    return await processEvents(events);
  },
);

// Add user context to current span in auth middleware
addUserToSpan({
  userId: session.user.id,
  projectId: currentProject.id,
  orgId: org.id,
  plan: org.plan,
});

// Record a metric
recordIncrement("langfuse.ingestion.events_processed", events.length, {
  projectId: project.id,
});

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment