Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datajuicer Data juicer S3UploadFileMapper

From Leeroopedia
Knowledge Sources
Domains Cloud Storage, Data I/O, S3 Integration
Last Updated 2026-02-14 16:00 GMT

Overview

Uploads local files to S3 storage and updates dataset file paths to S3 URLs, enabling processed data to be persisted in cloud object storage.

Description

S3UploadFileMapper is the companion operator to S3DownloadFileMapper, completing the bidirectional cloud storage integration for Data-Juicer pipelines. It processes nested lists of local file paths, uploading them to a configured S3 bucket while maintaining the original structure. Key features include:

  • Concurrent Uploads -- Uses asyncio with semaphore-based concurrency control (max_concurrent parameter)
  • Skip Existing -- Optionally skips files that already exist in S3 (based on S3 key existence check via HEAD object)
  • Local Cleanup -- Optionally deletes local files after successful upload (remove_local parameter)
  • S3 Key Construction -- Constructs S3 keys from a configurable prefix and the local file basename
  • S3-Compatible Services -- Supports custom endpoint URLs for MinIO and other S3-compatible services
  • Credential Resolution -- Resolves AWS credentials with priority: environment variables > operator parameters
  • Lazy S3 Client -- Initializes the boto3 S3 client lazily to avoid serialization issues with Ray
  • Already-S3 Detection -- Skips paths that are already S3 URLs (s3://...)

The operator handles the full upload lifecycle:

  1. Flattens nested paths while preserving structure information
  2. Uploads files concurrently via asyncio with run_in_executor
  3. Reconstructs the nested structure with S3 URLs
  4. Logs upload summary (success, exists, skipped, failed counts)
  5. Reports failed uploads via logger

Requires s3_bucket to be specified. Raises ValueError if AWS credentials are not available.

Usage

Use this operator to persist processed output files to S3 cloud storage. Typically used at the end of a pipeline after data processing is complete.

Code Reference

Source Location

Signature

class S3UploadFileMapper(Mapper):
    _batched_op = True

    def __init__(
        self,
        upload_field: str = None,
        s3_bucket: str = None,
        s3_prefix: str = "",
        aws_access_key_id: str = None,
        aws_secret_access_key: str = None,
        aws_session_token: str = None,
        aws_region: str = None,
        endpoint_url: str = None,
        remove_local: bool = False,
        skip_existing: bool = True,
        max_concurrent: int = 10,
        *args, **kwargs,
    ):

Import

from data_juicer.ops.mapper.s3_upload_file_mapper import S3UploadFileMapper

I/O Contract

Inputs

Name Type Required Description
upload_field str Yes Field name containing file paths to upload
s3_bucket str Yes S3 bucket name to upload files to
s3_prefix str No Prefix (folder path) in S3 bucket. E.g., "videos/" or "data/videos/". Default: ""
aws_access_key_id str No AWS access key ID
aws_secret_access_key str No AWS secret access key
aws_session_token str No AWS session token (optional)
aws_region str No AWS region
endpoint_url str No Custom S3 endpoint URL for S3-compatible services
remove_local bool No Delete local files after successful upload. Default: False
skip_existing bool No Skip uploading if file already exists in S3. Default: True
max_concurrent int No Maximum concurrent uploads. Default: 10

Outputs

Name Type Description
samples[upload_field] list Updated with S3 URLs (s3://bucket/prefix/filename) replacing local paths

Usage Examples

# Upload processed videos to S3
mapper = S3UploadFileMapper(
    upload_field="videos",
    s3_bucket="my-data-bucket",
    s3_prefix="processed/videos/",
    aws_access_key_id="AKIA...",
    aws_secret_access_key="secret...",
    aws_region="us-east-1",
    remove_local=True,
    max_concurrent=20,
)

# Upload to MinIO
mapper = S3UploadFileMapper(
    upload_field="images",
    s3_bucket="images-bucket",
    s3_prefix="output/",
    endpoint_url="http://minio:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    skip_existing=True,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment