Implementation:Datajuicer Data juicer S3UploadFileMapper
| Knowledge Sources | |
|---|---|
| Domains | Cloud Storage, Data I/O, S3 Integration |
| Last Updated | 2026-02-14 16:00 GMT |
Overview
Uploads local files to S3 storage and updates dataset file paths to S3 URLs, enabling processed data to be persisted in cloud object storage.
Description
S3UploadFileMapper is the companion operator to S3DownloadFileMapper, completing the bidirectional cloud storage integration for Data-Juicer pipelines. It processes nested lists of local file paths, uploading them to a configured S3 bucket while maintaining the original structure. Key features include:
- Concurrent Uploads -- Uses asyncio with semaphore-based concurrency control (max_concurrent parameter)
- Skip Existing -- Optionally skips files that already exist in S3 (based on S3 key existence check via HEAD object)
- Local Cleanup -- Optionally deletes local files after successful upload (remove_local parameter)
- S3 Key Construction -- Constructs S3 keys from a configurable prefix and the local file basename
- S3-Compatible Services -- Supports custom endpoint URLs for MinIO and other S3-compatible services
- Credential Resolution -- Resolves AWS credentials with priority: environment variables > operator parameters
- Lazy S3 Client -- Initializes the boto3 S3 client lazily to avoid serialization issues with Ray
- Already-S3 Detection -- Skips paths that are already S3 URLs (s3://...)
The operator handles the full upload lifecycle:
- Flattens nested paths while preserving structure information
- Uploads files concurrently via asyncio with run_in_executor
- Reconstructs the nested structure with S3 URLs
- Logs upload summary (success, exists, skipped, failed counts)
- Reports failed uploads via logger
Requires s3_bucket to be specified. Raises ValueError if AWS credentials are not available.
Usage
Use this operator to persist processed output files to S3 cloud storage. Typically used at the end of a pipeline after data processing is complete.
Code Reference
Source Location
- Repository: Datajuicer_Data_juicer
- File: data_juicer/ops/mapper/s3_upload_file_mapper.py
- Lines: 1-323
Signature
class S3UploadFileMapper(Mapper):
_batched_op = True
def __init__(
self,
upload_field: str = None,
s3_bucket: str = None,
s3_prefix: str = "",
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
aws_session_token: str = None,
aws_region: str = None,
endpoint_url: str = None,
remove_local: bool = False,
skip_existing: bool = True,
max_concurrent: int = 10,
*args, **kwargs,
):
Import
from data_juicer.ops.mapper.s3_upload_file_mapper import S3UploadFileMapper
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| upload_field | str | Yes | Field name containing file paths to upload |
| s3_bucket | str | Yes | S3 bucket name to upload files to |
| s3_prefix | str | No | Prefix (folder path) in S3 bucket. E.g., "videos/" or "data/videos/". Default: "" |
| aws_access_key_id | str | No | AWS access key ID |
| aws_secret_access_key | str | No | AWS secret access key |
| aws_session_token | str | No | AWS session token (optional) |
| aws_region | str | No | AWS region |
| endpoint_url | str | No | Custom S3 endpoint URL for S3-compatible services |
| remove_local | bool | No | Delete local files after successful upload. Default: False |
| skip_existing | bool | No | Skip uploading if file already exists in S3. Default: True |
| max_concurrent | int | No | Maximum concurrent uploads. Default: 10 |
Outputs
| Name | Type | Description |
|---|---|---|
| samples[upload_field] | list | Updated with S3 URLs (s3://bucket/prefix/filename) replacing local paths |
Usage Examples
# Upload processed videos to S3
mapper = S3UploadFileMapper(
upload_field="videos",
s3_bucket="my-data-bucket",
s3_prefix="processed/videos/",
aws_access_key_id="AKIA...",
aws_secret_access_key="secret...",
aws_region="us-east-1",
remove_local=True,
max_concurrent=20,
)
# Upload to MinIO
mapper = S3UploadFileMapper(
upload_field="images",
s3_bucket="images-bucket",
s3_prefix="output/",
endpoint_url="http://minio:9000",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
skip_existing=True,
)