Implementation:Langfuse Langfuse StorageService UploadFile
| Knowledge Sources | |
|---|---|
| Domains | Blob Storage, Trace Ingestion |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for uploading files and JSON payloads to multi-backend blob storage (S3, Azure Blob, Google Cloud Storage) provided by Langfuse.
Description
The StorageService interface defines the contract for all blob storage operations in Langfuse, and StorageServiceFactory selects the appropriate implementation based on environment configuration. Three implementations are provided:
S3StorageService: Uses the AWS SDK v3 (@aws-sdk/client-s3and@aws-sdk/lib-storage) to interact with S3 or S3-compatible storage (MinIO). Supports multipart upload via theUploadmanaged transfer utility, server-side encryption (SSE with KMS), configurable concurrent socket limits, and separate clients for internal operations vs. presigned URL generation (when an external endpoint is configured).
AzureBlobStorageService: Uses the@azure/storage-blobSDK. Supports block blob uploads for string data and streaming uploads viauploadStreamfor Readable data. Automatically creates the container if it does not exist (with an option to skip this check viaLANGFUSE_AZURE_SKIP_CONTAINER_CHECK).
GoogleCloudStorageService: Uses the@google-cloud/storageSDK. Supports credentials as a JSON string or a file path, with fallback to default application credentials.
The uploadJson convenience method serializes objects to JSON and uploads with application/json content type. The uploadFile method handles both string and Readable stream data.
Error handling wraps all operations to detect DNS lookup failures (EAI_AGAIN) and converts them to ServiceUnavailableError for appropriate HTTP 503 responses. Delete operations use exponential backoff retries (3 attempts).
Usage
Use StorageServiceFactory.getInstance to obtain a storage client, then call uploadFile or uploadJson to persist data. The factory is typically called once and the instance is cached as a module-level singleton.
Code Reference
Source Location
- Repository: langfuse
- File:
packages/shared/src/server/services/StorageService.ts - Lines: L57-88 (interface), L90-143 (factory), L427-548 (S3 uploadFile and uploadJson)
Signature
// Interface
export interface StorageService {
uploadFile(params: {
fileName: string;
fileType: string;
data: Readable | string;
partSize?: number;
queueSize?: number;
}): Promise<void>;
uploadJson(
path: string,
body: Record<string, unknown>[] | Record<string, unknown>,
): Promise<void>;
download(path: string): Promise<string>;
listFiles(prefix: string): Promise<{ file: string; createdAt: Date }[]>;
getSignedUrl(fileName: string, ttlSeconds: number, asAttachment?: boolean): Promise<string>;
getSignedUploadUrl(params: {
path: string;
ttlSeconds: number;
sha256Hash: string;
contentType: string;
contentLength: number;
}): Promise<string>;
deleteFiles(paths: string[]): Promise<void>;
}
// Factory
export class StorageServiceFactory {
public static getInstance(params: {
accessKeyId: string | undefined;
secretAccessKey: string | undefined;
bucketName: string;
endpoint: string | undefined;
externalEndpoint?: string | undefined;
region: string | undefined;
forcePathStyle: boolean;
useAzureBlob?: boolean;
useGoogleCloudStorage?: boolean;
googleCloudCredentials?: string;
awsSse: string | undefined;
awsSseKmsKeyId: string | undefined;
}): StorageService;
}
Import
import {
StorageService,
StorageServiceFactory,
} from "@langfuse/shared/src/server/services/StorageService";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| fileName | string |
Yes | The object key / path within the bucket where the file will be stored. |
| fileType | string |
Yes | The MIME content type of the file (e.g., "application/json").
|
| data | string | Yes | The file content. Can be a string for small payloads or a Readable stream for large payloads. |
| partSize | number |
No | Part size in bytes for S3 multipart uploads. Default is 5 MB. Set to 100 MB for files up to ~1 TB. |
| queueSize | number |
No | Number of concurrent part uploads for S3 multipart. Defaults to the AWS SDK default. |
| path (uploadJson) | string |
Yes | The object key for the JSON upload. |
| body (uploadJson) | Record<string, unknown> | Yes | The JavaScript object or array to serialize as JSON. |
Outputs
| Name | Type | Description |
|---|---|---|
| (uploadFile return) | Promise<void> |
Resolves when the upload completes successfully. Throws on failure. |
| (uploadJson return) | Promise<void> |
Resolves when the JSON upload completes. Throws on failure. |
| (download return) | Promise<string> |
The file content as a UTF-8 string. |
| (getSignedUrl return) | Promise<string> |
A presigned URL for downloading the file, valid for the specified TTL. |
Usage Examples
Uploading Ingestion Events to S3
import { StorageServiceFactory } from "@langfuse/shared/src/server/services/StorageService";
import { env } from "@langfuse/shared/src/env";
// Create a singleton storage client
const storageClient = StorageServiceFactory.getInstance({
bucketName: env.LANGFUSE_S3_EVENT_UPLOAD_BUCKET,
accessKeyId: env.LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID,
secretAccessKey: env.LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY,
endpoint: env.LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT,
region: env.LANGFUSE_S3_EVENT_UPLOAD_REGION,
forcePathStyle: env.LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE === "true",
awsSse: env.LANGFUSE_S3_EVENT_UPLOAD_SSE,
awsSseKmsKeyId: env.LANGFUSE_S3_EVENT_UPLOAD_SSE_KMS_KEY_ID,
});
// Upload a batch of events as JSON
const events = [
{ id: "evt_001", type: "trace-create", body: { id: "trace_abc", name: "My Trace" } },
{ id: "evt_002", type: "generation-create", body: { id: "gen_001", traceId: "trace_abc" } },
];
const bucketPath = "ingestion-events/proj_123/trace/trace_abc/evt_001.json";
await storageClient.uploadJson(bucketPath, events);
Downloading Events in the Worker
import { StorageServiceFactory } from "@langfuse/shared/src/server/services/StorageService";
const storageClient = StorageServiceFactory.getInstance({ /* ... */ });
// Worker downloads the event data using the file key from the queue job
const fileKey = "ingestion-events/proj_123/observation/obs_001/evt_003.json";
const jsonString = await storageClient.download(fileKey);
const events = JSON.parse(jsonString);
Uploading a Large File with Custom Part Size
import { Readable } from "stream";
import { StorageServiceFactory } from "@langfuse/shared/src/server/services/StorageService";
const storageClient = StorageServiceFactory.getInstance({ /* ... */ });
const largeDataStream = Readable.from(generateLargeDataset());
await storageClient.uploadFile({
fileName: "exports/proj_123/export_2024-01-15.jsonl",
fileType: "application/jsonl",
data: largeDataStream,
partSize: 100 * 1024 * 1024, // 100 MB parts for files up to ~1 TB
queueSize: 4, // 4 concurrent part uploads
});