Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse StorageService UploadFile

From Leeroopedia
Knowledge Sources
Domains Blob Storage, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for uploading files and JSON payloads to multi-backend blob storage (S3, Azure Blob, Google Cloud Storage) provided by Langfuse.

Description

The StorageService interface defines the contract for all blob storage operations in Langfuse, and StorageServiceFactory selects the appropriate implementation based on environment configuration. Three implementations are provided:

  • S3StorageService: Uses the AWS SDK v3 (@aws-sdk/client-s3 and @aws-sdk/lib-storage) to interact with S3 or S3-compatible storage (MinIO). Supports multipart upload via the Upload managed transfer utility, server-side encryption (SSE with KMS), configurable concurrent socket limits, and separate clients for internal operations vs. presigned URL generation (when an external endpoint is configured).
  • AzureBlobStorageService: Uses the @azure/storage-blob SDK. Supports block blob uploads for string data and streaming uploads via uploadStream for Readable data. Automatically creates the container if it does not exist (with an option to skip this check via LANGFUSE_AZURE_SKIP_CONTAINER_CHECK).
  • GoogleCloudStorageService: Uses the @google-cloud/storage SDK. Supports credentials as a JSON string or a file path, with fallback to default application credentials.

The uploadJson convenience method serializes objects to JSON and uploads with application/json content type. The uploadFile method handles both string and Readable stream data.

Error handling wraps all operations to detect DNS lookup failures (EAI_AGAIN) and converts them to ServiceUnavailableError for appropriate HTTP 503 responses. Delete operations use exponential backoff retries (3 attempts).

Usage

Use StorageServiceFactory.getInstance to obtain a storage client, then call uploadFile or uploadJson to persist data. The factory is typically called once and the instance is cached as a module-level singleton.

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/services/StorageService.ts
  • Lines: L57-88 (interface), L90-143 (factory), L427-548 (S3 uploadFile and uploadJson)

Signature

// Interface
export interface StorageService {
  uploadFile(params: {
    fileName: string;
    fileType: string;
    data: Readable | string;
    partSize?: number;
    queueSize?: number;
  }): Promise<void>;

  uploadJson(
    path: string,
    body: Record<string, unknown>[] | Record<string, unknown>,
  ): Promise<void>;

  download(path: string): Promise<string>;

  listFiles(prefix: string): Promise<{ file: string; createdAt: Date }[]>;

  getSignedUrl(fileName: string, ttlSeconds: number, asAttachment?: boolean): Promise<string>;

  getSignedUploadUrl(params: {
    path: string;
    ttlSeconds: number;
    sha256Hash: string;
    contentType: string;
    contentLength: number;
  }): Promise<string>;

  deleteFiles(paths: string[]): Promise<void>;
}

// Factory
export class StorageServiceFactory {
  public static getInstance(params: {
    accessKeyId: string | undefined;
    secretAccessKey: string | undefined;
    bucketName: string;
    endpoint: string | undefined;
    externalEndpoint?: string | undefined;
    region: string | undefined;
    forcePathStyle: boolean;
    useAzureBlob?: boolean;
    useGoogleCloudStorage?: boolean;
    googleCloudCredentials?: string;
    awsSse: string | undefined;
    awsSseKmsKeyId: string | undefined;
  }): StorageService;
}

Import

import {
  StorageService,
  StorageServiceFactory,
} from "@langfuse/shared/src/server/services/StorageService";

I/O Contract

Inputs

Name Type Required Description
fileName string Yes The object key / path within the bucket where the file will be stored.
fileType string Yes The MIME content type of the file (e.g., "application/json").
data string Yes The file content. Can be a string for small payloads or a Readable stream for large payloads.
partSize number No Part size in bytes for S3 multipart uploads. Default is 5 MB. Set to 100 MB for files up to ~1 TB.
queueSize number No Number of concurrent part uploads for S3 multipart. Defaults to the AWS SDK default.
path (uploadJson) string Yes The object key for the JSON upload.
body (uploadJson) Record<string, unknown> Yes The JavaScript object or array to serialize as JSON.

Outputs

Name Type Description
(uploadFile return) Promise<void> Resolves when the upload completes successfully. Throws on failure.
(uploadJson return) Promise<void> Resolves when the JSON upload completes. Throws on failure.
(download return) Promise<string> The file content as a UTF-8 string.
(getSignedUrl return) Promise<string> A presigned URL for downloading the file, valid for the specified TTL.

Usage Examples

Uploading Ingestion Events to S3

import { StorageServiceFactory } from "@langfuse/shared/src/server/services/StorageService";
import { env } from "@langfuse/shared/src/env";

// Create a singleton storage client
const storageClient = StorageServiceFactory.getInstance({
  bucketName: env.LANGFUSE_S3_EVENT_UPLOAD_BUCKET,
  accessKeyId: env.LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID,
  secretAccessKey: env.LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY,
  endpoint: env.LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT,
  region: env.LANGFUSE_S3_EVENT_UPLOAD_REGION,
  forcePathStyle: env.LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE === "true",
  awsSse: env.LANGFUSE_S3_EVENT_UPLOAD_SSE,
  awsSseKmsKeyId: env.LANGFUSE_S3_EVENT_UPLOAD_SSE_KMS_KEY_ID,
});

// Upload a batch of events as JSON
const events = [
  { id: "evt_001", type: "trace-create", body: { id: "trace_abc", name: "My Trace" } },
  { id: "evt_002", type: "generation-create", body: { id: "gen_001", traceId: "trace_abc" } },
];
const bucketPath = "ingestion-events/proj_123/trace/trace_abc/evt_001.json";

await storageClient.uploadJson(bucketPath, events);

Downloading Events in the Worker

import { StorageServiceFactory } from "@langfuse/shared/src/server/services/StorageService";

const storageClient = StorageServiceFactory.getInstance({ /* ... */ });

// Worker downloads the event data using the file key from the queue job
const fileKey = "ingestion-events/proj_123/observation/obs_001/evt_003.json";
const jsonString = await storageClient.download(fileKey);
const events = JSON.parse(jsonString);

Uploading a Large File with Custom Part Size

import { Readable } from "stream";
import { StorageServiceFactory } from "@langfuse/shared/src/server/services/StorageService";

const storageClient = StorageServiceFactory.getInstance({ /* ... */ });

const largeDataStream = Readable.from(generateLargeDataset());

await storageClient.uploadFile({
  fileName: "exports/proj_123/export_2024-01-15.jsonl",
  fileType: "application/jsonl",
  data: largeDataStream,
  partSize: 100 * 1024 * 1024,  // 100 MB parts for files up to ~1 TB
  queueSize: 4,                  // 4 concurrent part uploads
});

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment