Implementation:NVIDIA NeMo Curator Base Downloader
| Knowledge Sources | |
|---|---|
| Domains | Data Acquisition, Abstract Base Class, Pipeline Infrastructure |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
DocumentDownloader is the abstract base class for all document downloaders in NeMo Curator, providing atomic download with temporary file handling, existing file detection, and an s5cmd availability check, alongside a companion DocumentDownloadStage that wraps any downloader as a pipeline processing stage.
Description
The DocumentDownloader ABC defines the interface and common behavior for downloading files from URLs to local storage. It provides:
- Atomic downloads: Files are first downloaded to a
.tmptemporary path, then atomically renamed viaos.rename()to their final location upon success. This prevents partially downloaded files from being treated as complete. - Existing file detection: If the output file already exists and is non-empty, the download is skipped to avoid redundant transfers.
- s5cmd check: A utility method
_check_s5cmd_installed()for subclasses that require thes5cmdS3 transfer tool. - Configurable parallelism: The
num_workers_per_node()method can be overridden to limit concurrent downloads per node to avoid network overload.
The companion DocumentDownloadStage is a dataclass extending ProcessingStage[FileGroupTask, FileGroupTask]. It takes a FileGroupTask containing URLs, calls downloader.download() for each URL, collects successfully downloaded file paths, and returns a new FileGroupTask with those local paths. It also records downloaded file metadata for deterministic naming in downstream write stages.
Usage
Subclass DocumentDownloader to implement download logic for specific data sources (e.g., ArXiv, Common Crawl, Wikipedia). Use DocumentDownloadStage to wrap any downloader into a pipeline-compatible processing stage.
Code Reference
Source Location
- Repository: NeMo-Curator
- File:
nemo_curator/stages/text/download/base/download.py - Lines: 1-177
Signature
class DocumentDownloader(ABC):
"""Abstract base class for document downloaders."""
def __init__(self, download_dir: str, verbose: bool = False):
...
def _check_s5cmd_installed(self) -> bool:
...
@abstractmethod
def _get_output_filename(self, url: str) -> str:
...
@abstractmethod
def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
...
def download(self, url: str) -> str | None:
...
def num_workers_per_node(self) -> int | None:
...
@dataclass
class DocumentDownloadStage(ProcessingStage[FileGroupTask, FileGroupTask]):
"""Stage that downloads files from URLs to local storage."""
resources = Resources(cpus=0.5)
downloader: DocumentDownloader
batch_size = None
def inputs(self) -> tuple[list[str], list[str]]:
...
def outputs(self) -> tuple[list[str], list[str]]:
...
def process(self, task: FileGroupTask) -> FileGroupTask:
...
def xenna_stage_spec(self) -> dict[str, Any]:
...
Import
from nemo_curator.stages.text.download.base.download import DocumentDownloader, DocumentDownloadStage
# Or via the package shortcut:
from nemo_curator.stages.text.download import DocumentDownloader
I/O Contract
DocumentDownloader Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| download_dir | str |
Yes | Directory to store downloaded files (created automatically if it does not exist) |
| verbose | bool |
No | If True, logs detailed download information (default: False) |
DocumentDownloader Outputs
| Name | Type | Description |
|---|---|---|
| return value | None | Path to the downloaded file on success, or None on failure (from download())
|
DocumentDownloadStage I/O
| Direction | Type | Description |
|---|---|---|
| Input | FileGroupTask |
Task containing a list of URLs in task.data
|
| Output | FileGroupTask |
Task containing a list of local file paths in task.data
|
Key Methods
download
The main public method implementing the download workflow:
- Generates the output filename via
_get_output_filename(url). - Checks if the final file already exists and is non-empty; if so, returns its path immediately.
- Downloads to a temporary
.tmppath via_download_to_path(url, temp_file). - On success, atomically renames the temp file to the final path.
- On failure, logs the error and returns
None.
Abstract Methods (for subclasses)
| Method | Description |
|---|---|
_get_output_filename(url) |
Generate the output filename from the URL (without directory path) |
_download_to_path(url, path) |
Perform the actual download to a local path; return (success, error_message)
|
Usage Examples
Implementing a Custom Downloader
import urllib.request
from nemo_curator.stages.text.download import DocumentDownloader
class HttpDownloader(DocumentDownloader):
def _get_output_filename(self, url: str) -> str:
return url.split("/")[-1]
def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
try:
urllib.request.urlretrieve(url, path)
return True, None
except Exception as e:
return False, str(e)
Using DocumentDownloadStage
from nemo_curator.stages.text.download.base.download import DocumentDownloadStage
downloader = HttpDownloader(download_dir="/data/downloads")
stage = DocumentDownloadStage(downloader=downloader)
# The stage processes FileGroupTask objects containing URLs
# and returns FileGroupTask objects containing local file paths
Known Implementations
- ArxivDownloader -- Downloads ArXiv source tar files from S3
- Common Crawl Downloader -- Downloads Common Crawl WARC files