Implementation:Datajuicer Data juicer S3DownloadFileMapper
| Knowledge Sources | |
|---|---|
| Domains | Cloud Storage, Data I/O, S3 Integration |
| Last Updated | 2026-02-14 16:00 GMT |
Overview
Downloads files from S3 URLs to local storage or loads them into memory, enabling the Data-Juicer pipeline to work with data stored in Amazon S3 or S3-compatible services such as MinIO.
Description
S3DownloadFileMapper is an essential I/O operator for cloud-based data pipelines. It processes nested lists of S3 URLs (s3://...) or local file paths, downloading files while preserving the original nested structure in the output. Key features include:
- Concurrent Downloads -- Uses asyncio with configurable semaphore-based concurrency control (max_concurrent parameter)
- Flexible Output Modes -- Can save files to a local directory (save_dir) or load content directly into a sample field (save_field)
- Resume Download -- Supports resuming previously interrupted downloads by skipping existing files
- S3-Compatible Services -- Supports custom endpoint URLs for MinIO and other S3-compatible services
- Credential Resolution -- Resolves AWS credentials with priority: environment variables > operator parameters, using the get_aws_credentials utility
- Lazy S3 Client -- Initializes the boto3 S3 client lazily to avoid serialization issues with Ray
The operator handles the full download lifecycle:
- Flattens nested URLs while preserving structure information
- Downloads files concurrently via asyncio with run_in_executor for S3 operations
- Reconstructs the nested structure with downloaded paths or content
- Reports failed downloads via logger
Does not support HTTP/HTTPS URLs -- only S3 URLs and local file paths.
Usage
Use this operator to integrate S3-stored data into Data-Juicer processing pipelines. It is the counterpart of S3UploadFileMapper for bidirectional cloud storage integration.
Code Reference
Source Location
- Repository: Datajuicer_Data_juicer
- File: data_juicer/ops/mapper/s3_download_file_mapper.py
- Lines: 1-413
Signature
class S3DownloadFileMapper(Mapper):
_batched_op = True
def __init__(
self,
download_field: str = None,
save_dir: str = None,
save_field: str = None,
resume_download: bool = False,
timeout: int = 30,
max_concurrent: int = 10,
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
aws_session_token: str = None,
aws_region: str = None,
endpoint_url: str = None,
*args, **kwargs,
):
Import
from data_juicer.ops.mapper.s3_download_file_mapper import S3DownloadFileMapper
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| download_field | str | Yes | Field name containing the URL/path to download |
| save_dir | str | No | Directory to save downloaded files locally |
| save_field | str | No | Field name to store downloaded file content in memory |
| resume_download | bool | No | Skip already downloaded files. Default: False |
| timeout | int | No | Deprecated; kept for backward compatibility. Default: 30 |
| max_concurrent | int | No | Maximum concurrent downloads. Default: 10 |
| aws_access_key_id | str | No | AWS access key ID |
| aws_secret_access_key | str | No | AWS secret access key |
| aws_session_token | str | No | AWS session token (optional) |
| aws_region | str | No | AWS region |
| endpoint_url | str | No | Custom S3 endpoint URL for S3-compatible services |
Outputs
| Name | Type | Description |
|---|---|---|
| samples[download_field] | list | Updated with local file paths (if save_dir is set) |
| samples[save_field] | list | File contents loaded into memory (if save_field is set) |
Usage Examples
# Download S3 files to a local directory
mapper = S3DownloadFileMapper(
download_field="videos",
save_dir="/data/local_videos/",
aws_access_key_id="AKIA...",
aws_secret_access_key="secret...",
aws_region="us-east-1",
max_concurrent=20,
)
# Download from MinIO to memory
mapper = S3DownloadFileMapper(
download_field="images",
save_field="image_bytes",
endpoint_url="http://minio:9000",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
resume_download=True,
)