Implementation:Langfuse Langfuse StorageService UploadWithSignedUrl
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, Blob Storage, S3, Cloud Infrastructure |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for uploading export files to blob storage via multipart streaming upload and generating time-limited signed download URLs, provided by Langfuse.
Description
The StorageService is defined as a TypeScript interface with multiple implementations behind a StorageServiceFactory:
StorageServiceFactory.getInstance(params) -- A static factory method that examines the configuration to select the appropriate backend:
- If
useAzureBlobis true (orLANGFUSE_USE_AZURE_BLOB === "true"), returns anAzureBlobStorageService. - If
useGoogleCloudStorageis true (orLANGFUSE_USE_GOOGLE_CLOUD_STORAGE === "true"), returns aGoogleCloudStorageService. - Otherwise, returns an
S3StorageService(default, works with AWS S3, MinIO, and other S3-compatible services).
uploadWithSignedUrl(params) -- The key method for batch exports. It:
- Calls
uploadFileinternally to perform the actual upload. - For S3: Uses
@aws-sdk/lib-storage Uploadfor multipart upload, with configurablepartSizeandqueueSize(concurrent parts). Supports SSE (including KMS) viaaddSSEToParams. - For Azure: Uses
BlockBlobClient.uploadStreamwith configurablebufferSize(default 8 MB per block) andmaxConcurrency(5). - For GCS: Pipes the readable stream into
File.createWriteStream. - After upload completes, generates a signed download URL using the backend's presigning API.
- For S3, a separate
signedUrlClientis used when anexternalEndpointis configured, ensuring the signed URL points to the publicly accessible endpoint rather than the internal one.
Error handling wraps all operations with handleStorageError, which detects DNS failures (EAI_AGAIN) and throws ServiceUnavailableError for retry-friendly behavior.
Usage
Used by handleBatchExportJob to upload the formatted export stream and obtain a download URL. The factory is instantiated with batch-export-specific environment variables (LANGFUSE_S3_BATCH_EXPORT_*), which may differ from the general media storage configuration.
Code Reference
Source Location
- Repository: langfuse
- File: packages/shared/src/server/services/StorageService.ts
- Lines: 530-548 (S3
uploadWithSignedUrl), 90-143 (factory), 235-252 (Azure), 773-790 (GCS)
Signature
// Factory
export class StorageServiceFactory {
public static getInstance(params: {
accessKeyId: string | undefined;
secretAccessKey: string | undefined;
bucketName: string;
endpoint: string | undefined;
externalEndpoint?: string | undefined;
region: string | undefined;
forcePathStyle: boolean;
useAzureBlob?: boolean;
useGoogleCloudStorage?: boolean;
googleCloudCredentials?: string;
awsSse: string | undefined;
awsSseKmsKeyId: string | undefined;
}): StorageService;
}
// Interface method
interface StorageService {
uploadWithSignedUrl(params: {
fileName: string;
fileType: string;
data: Readable | string;
expiresInSeconds: number;
partSize?: number;
queueSize?: number;
}): Promise<{ signedUrl: string }>;
}
Import
import { StorageServiceFactory } from "@langfuse/shared/src/server";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| fileName | string | Yes | The object key (path) in the bucket. For batch exports, follows the pattern: {prefix}{timestamp}-lf-{tableName}-export-{projectId}.{extension}.
|
| fileType | string | Yes | The MIME type of the file, e.g., "text/csv; charset=utf-8", "application/json; charset=utf-8", or "application/x-ndjson; charset=utf-8".
|
| data | string | Yes | The data to upload. For batch exports, this is the piped Transform stream from the format transformation stage.
|
| expiresInSeconds | number | Yes | The TTL for the signed download URL. Computed as BATCH_EXPORT_DOWNLOAD_LINK_EXPIRATION_HOURS * 3600.
|
| partSize | number | No | The size of each multipart upload part in bytes. Defaults to 5 MB for S3, 8 MB for Azure. Configurable via BATCH_EXPORT_S3_PART_SIZE_MIB.
|
| queueSize | number | No | The number of concurrent part uploads (S3 only). Typically set to 4 for batch exports. |
Factory constructor inputs:
| Name | Type | Required | Description |
|---|---|---|---|
| bucketName | string | Yes | The storage bucket name. Set via LANGFUSE_S3_BATCH_EXPORT_BUCKET.
|
| accessKeyId | string | No | Storage access key. Falls back to default credentials if not provided. |
| secretAccessKey | string | No | Storage secret key. Falls back to default credentials if not provided. |
| endpoint | string | No | Internal storage endpoint URL. |
| externalEndpoint | string | No | Public-facing endpoint for signed URLs. Used when internal and external endpoints differ (e.g., MinIO behind a reverse proxy). |
| region | string | No | Storage region (e.g., us-east-1).
|
| forcePathStyle | boolean | Yes | Use path-style URLs instead of virtual-hosted-style. Required for MinIO and some S3-compatible services. |
| awsSse | string | No | Server-side encryption method (e.g., "aws:kms").
|
| awsSseKmsKeyId | string | No | KMS key ID for SSE-KMS encryption. |
Outputs
| Name | Type | Description |
|---|---|---|
| signedUrl | string | A time-limited presigned URL for downloading the uploaded file. The URL expires after expiresInSeconds. For S3, the URL is generated using @aws-sdk/s3-request-presigner. For Azure, it uses a SAS token. For GCS, it uses a v4 signed URL.
|
Usage Examples
Uploading a batch export file to S3
import { StorageServiceFactory } from "@langfuse/shared/src/server";
import { env } from "../../env";
const { signedUrl } = await StorageServiceFactory.getInstance({
bucketName: env.LANGFUSE_S3_BATCH_EXPORT_BUCKET,
accessKeyId: env.LANGFUSE_S3_BATCH_EXPORT_ACCESS_KEY_ID,
secretAccessKey: env.LANGFUSE_S3_BATCH_EXPORT_SECRET_ACCESS_KEY,
endpoint: env.LANGFUSE_S3_BATCH_EXPORT_ENDPOINT,
externalEndpoint: env.LANGFUSE_S3_BATCH_EXPORT_EXTERNAL_ENDPOINT,
region: env.LANGFUSE_S3_BATCH_EXPORT_REGION,
forcePathStyle: env.LANGFUSE_S3_BATCH_EXPORT_FORCE_PATH_STYLE === "true",
awsSse: env.LANGFUSE_S3_BATCH_EXPORT_SSE,
awsSseKmsKeyId: env.LANGFUSE_S3_BATCH_EXPORT_SSE_KMS_KEY_ID,
}).uploadWithSignedUrl({
fileName: "batch-exports/1707868800000-lf-traces-export-proj123.csv",
fileType: "text/csv; charset=utf-8",
data: fileStream, // piped Transform stream
expiresInSeconds: 3600, // 1 hour
partSize: 10 * 1024 * 1024, // 10 MB parts
queueSize: 4,
});
console.log("Download URL:", signedUrl);
// https://s3.us-east-1.amazonaws.com/bucket/batch-exports/...?X-Amz-Signature=...
Using with Azure Blob Storage
// When LANGFUSE_USE_AZURE_BLOB=true, the factory returns AzureBlobStorageService
const { signedUrl } = await StorageServiceFactory.getInstance({
bucketName: "langfuse-exports", // Azure container name
accessKeyId: "accountName",
secretAccessKey: "accountKey",
endpoint: "https://accountName.blob.core.windows.net",
region: undefined,
forcePathStyle: false,
useAzureBlob: true,
awsSse: undefined,
awsSseKmsKeyId: undefined,
}).uploadWithSignedUrl({
fileName: "exports/my-export.json",
fileType: "application/json; charset=utf-8",
data: fileStream,
expiresInSeconds: 7200,
});