Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse StorageService UploadWithSignedUrl

From Leeroopedia
Knowledge Sources
Domains Batch Export, Blob Storage, S3, Cloud Infrastructure
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for uploading export files to blob storage via multipart streaming upload and generating time-limited signed download URLs, provided by Langfuse.

Description

The StorageService is defined as a TypeScript interface with multiple implementations behind a StorageServiceFactory:

StorageServiceFactory.getInstance(params) -- A static factory method that examines the configuration to select the appropriate backend:

  • If useAzureBlob is true (or LANGFUSE_USE_AZURE_BLOB === "true"), returns an AzureBlobStorageService.
  • If useGoogleCloudStorage is true (or LANGFUSE_USE_GOOGLE_CLOUD_STORAGE === "true"), returns a GoogleCloudStorageService.
  • Otherwise, returns an S3StorageService (default, works with AWS S3, MinIO, and other S3-compatible services).

uploadWithSignedUrl(params) -- The key method for batch exports. It:

  1. Calls uploadFile internally to perform the actual upload.
  2. For S3: Uses @aws-sdk/lib-storage Upload for multipart upload, with configurable partSize and queueSize (concurrent parts). Supports SSE (including KMS) via addSSEToParams.
  3. For Azure: Uses BlockBlobClient.uploadStream with configurable bufferSize (default 8 MB per block) and maxConcurrency (5).
  4. For GCS: Pipes the readable stream into File.createWriteStream.
  5. After upload completes, generates a signed download URL using the backend's presigning API.
  6. For S3, a separate signedUrlClient is used when an externalEndpoint is configured, ensuring the signed URL points to the publicly accessible endpoint rather than the internal one.

Error handling wraps all operations with handleStorageError, which detects DNS failures (EAI_AGAIN) and throws ServiceUnavailableError for retry-friendly behavior.

Usage

Used by handleBatchExportJob to upload the formatted export stream and obtain a download URL. The factory is instantiated with batch-export-specific environment variables (LANGFUSE_S3_BATCH_EXPORT_*), which may differ from the general media storage configuration.

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/services/StorageService.ts
  • Lines: 530-548 (S3 uploadWithSignedUrl), 90-143 (factory), 235-252 (Azure), 773-790 (GCS)

Signature

// Factory
export class StorageServiceFactory {
  public static getInstance(params: {
    accessKeyId: string | undefined;
    secretAccessKey: string | undefined;
    bucketName: string;
    endpoint: string | undefined;
    externalEndpoint?: string | undefined;
    region: string | undefined;
    forcePathStyle: boolean;
    useAzureBlob?: boolean;
    useGoogleCloudStorage?: boolean;
    googleCloudCredentials?: string;
    awsSse: string | undefined;
    awsSseKmsKeyId: string | undefined;
  }): StorageService;
}

// Interface method
interface StorageService {
  uploadWithSignedUrl(params: {
    fileName: string;
    fileType: string;
    data: Readable | string;
    expiresInSeconds: number;
    partSize?: number;
    queueSize?: number;
  }): Promise<{ signedUrl: string }>;
}

Import

import { StorageServiceFactory } from "@langfuse/shared/src/server";

I/O Contract

Inputs

Name Type Required Description
fileName string Yes The object key (path) in the bucket. For batch exports, follows the pattern: {prefix}{timestamp}-lf-{tableName}-export-{projectId}.{extension}.
fileType string Yes The MIME type of the file, e.g., "text/csv; charset=utf-8", "application/json; charset=utf-8", or "application/x-ndjson; charset=utf-8".
data string Yes The data to upload. For batch exports, this is the piped Transform stream from the format transformation stage.
expiresInSeconds number Yes The TTL for the signed download URL. Computed as BATCH_EXPORT_DOWNLOAD_LINK_EXPIRATION_HOURS * 3600.
partSize number No The size of each multipart upload part in bytes. Defaults to 5 MB for S3, 8 MB for Azure. Configurable via BATCH_EXPORT_S3_PART_SIZE_MIB.
queueSize number No The number of concurrent part uploads (S3 only). Typically set to 4 for batch exports.

Factory constructor inputs:

Name Type Required Description
bucketName string Yes The storage bucket name. Set via LANGFUSE_S3_BATCH_EXPORT_BUCKET.
accessKeyId string No Storage access key. Falls back to default credentials if not provided.
secretAccessKey string No Storage secret key. Falls back to default credentials if not provided.
endpoint string No Internal storage endpoint URL.
externalEndpoint string No Public-facing endpoint for signed URLs. Used when internal and external endpoints differ (e.g., MinIO behind a reverse proxy).
region string No Storage region (e.g., us-east-1).
forcePathStyle boolean Yes Use path-style URLs instead of virtual-hosted-style. Required for MinIO and some S3-compatible services.
awsSse string No Server-side encryption method (e.g., "aws:kms").
awsSseKmsKeyId string No KMS key ID for SSE-KMS encryption.

Outputs

Name Type Description
signedUrl string A time-limited presigned URL for downloading the uploaded file. The URL expires after expiresInSeconds. For S3, the URL is generated using @aws-sdk/s3-request-presigner. For Azure, it uses a SAS token. For GCS, it uses a v4 signed URL.

Usage Examples

Uploading a batch export file to S3

import { StorageServiceFactory } from "@langfuse/shared/src/server";
import { env } from "../../env";

const { signedUrl } = await StorageServiceFactory.getInstance({
  bucketName: env.LANGFUSE_S3_BATCH_EXPORT_BUCKET,
  accessKeyId: env.LANGFUSE_S3_BATCH_EXPORT_ACCESS_KEY_ID,
  secretAccessKey: env.LANGFUSE_S3_BATCH_EXPORT_SECRET_ACCESS_KEY,
  endpoint: env.LANGFUSE_S3_BATCH_EXPORT_ENDPOINT,
  externalEndpoint: env.LANGFUSE_S3_BATCH_EXPORT_EXTERNAL_ENDPOINT,
  region: env.LANGFUSE_S3_BATCH_EXPORT_REGION,
  forcePathStyle: env.LANGFUSE_S3_BATCH_EXPORT_FORCE_PATH_STYLE === "true",
  awsSse: env.LANGFUSE_S3_BATCH_EXPORT_SSE,
  awsSseKmsKeyId: env.LANGFUSE_S3_BATCH_EXPORT_SSE_KMS_KEY_ID,
}).uploadWithSignedUrl({
  fileName: "batch-exports/1707868800000-lf-traces-export-proj123.csv",
  fileType: "text/csv; charset=utf-8",
  data: fileStream,        // piped Transform stream
  expiresInSeconds: 3600,  // 1 hour
  partSize: 10 * 1024 * 1024,  // 10 MB parts
  queueSize: 4,
});

console.log("Download URL:", signedUrl);
// https://s3.us-east-1.amazonaws.com/bucket/batch-exports/...?X-Amz-Signature=...

Using with Azure Blob Storage

// When LANGFUSE_USE_AZURE_BLOB=true, the factory returns AzureBlobStorageService
const { signedUrl } = await StorageServiceFactory.getInstance({
  bucketName: "langfuse-exports",  // Azure container name
  accessKeyId: "accountName",
  secretAccessKey: "accountKey",
  endpoint: "https://accountName.blob.core.windows.net",
  region: undefined,
  forcePathStyle: false,
  useAzureBlob: true,
  awsSse: undefined,
  awsSseKmsKeyId: undefined,
}).uploadWithSignedUrl({
  fileName: "exports/my-export.json",
  fileType: "application/json; charset=utf-8",
  data: fileStream,
  expiresInSeconds: 7200,
});

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment