Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator Base Downloader

From Leeroopedia
Knowledge Sources
Domains Data Acquisition, Abstract Base Class, Pipeline Infrastructure
Last Updated 2026-02-14 00:00 GMT

Overview

DocumentDownloader is the abstract base class for all document downloaders in NeMo Curator, providing atomic download with temporary file handling, existing file detection, and an s5cmd availability check, alongside a companion DocumentDownloadStage that wraps any downloader as a pipeline processing stage.

Description

The DocumentDownloader ABC defines the interface and common behavior for downloading files from URLs to local storage. It provides:

  • Atomic downloads: Files are first downloaded to a .tmp temporary path, then atomically renamed via os.rename() to their final location upon success. This prevents partially downloaded files from being treated as complete.
  • Existing file detection: If the output file already exists and is non-empty, the download is skipped to avoid redundant transfers.
  • s5cmd check: A utility method _check_s5cmd_installed() for subclasses that require the s5cmd S3 transfer tool.
  • Configurable parallelism: The num_workers_per_node() method can be overridden to limit concurrent downloads per node to avoid network overload.

The companion DocumentDownloadStage is a dataclass extending ProcessingStage[FileGroupTask, FileGroupTask]. It takes a FileGroupTask containing URLs, calls downloader.download() for each URL, collects successfully downloaded file paths, and returns a new FileGroupTask with those local paths. It also records downloaded file metadata for deterministic naming in downstream write stages.

Usage

Subclass DocumentDownloader to implement download logic for specific data sources (e.g., ArXiv, Common Crawl, Wikipedia). Use DocumentDownloadStage to wrap any downloader into a pipeline-compatible processing stage.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/text/download/base/download.py
  • Lines: 1-177

Signature

class DocumentDownloader(ABC):
    """Abstract base class for document downloaders."""

    def __init__(self, download_dir: str, verbose: bool = False):
        ...

    def _check_s5cmd_installed(self) -> bool:
        ...

    @abstractmethod
    def _get_output_filename(self, url: str) -> str:
        ...

    @abstractmethod
    def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
        ...

    def download(self, url: str) -> str | None:
        ...

    def num_workers_per_node(self) -> int | None:
        ...


@dataclass
class DocumentDownloadStage(ProcessingStage[FileGroupTask, FileGroupTask]):
    """Stage that downloads files from URLs to local storage."""

    resources = Resources(cpus=0.5)
    downloader: DocumentDownloader
    batch_size = None

    def inputs(self) -> tuple[list[str], list[str]]:
        ...

    def outputs(self) -> tuple[list[str], list[str]]:
        ...

    def process(self, task: FileGroupTask) -> FileGroupTask:
        ...

    def xenna_stage_spec(self) -> dict[str, Any]:
        ...

Import

from nemo_curator.stages.text.download.base.download import DocumentDownloader, DocumentDownloadStage
# Or via the package shortcut:
from nemo_curator.stages.text.download import DocumentDownloader

I/O Contract

DocumentDownloader Inputs

Name Type Required Description
download_dir str Yes Directory to store downloaded files (created automatically if it does not exist)
verbose bool No If True, logs detailed download information (default: False)

DocumentDownloader Outputs

Name Type Description
return value None Path to the downloaded file on success, or None on failure (from download())

DocumentDownloadStage I/O

Direction Type Description
Input FileGroupTask Task containing a list of URLs in task.data
Output FileGroupTask Task containing a list of local file paths in task.data

Key Methods

download

The main public method implementing the download workflow:

  1. Generates the output filename via _get_output_filename(url).
  2. Checks if the final file already exists and is non-empty; if so, returns its path immediately.
  3. Downloads to a temporary .tmp path via _download_to_path(url, temp_file).
  4. On success, atomically renames the temp file to the final path.
  5. On failure, logs the error and returns None.

Abstract Methods (for subclasses)

Method Description
_get_output_filename(url) Generate the output filename from the URL (without directory path)
_download_to_path(url, path) Perform the actual download to a local path; return (success, error_message)

Usage Examples

Implementing a Custom Downloader

import urllib.request
from nemo_curator.stages.text.download import DocumentDownloader


class HttpDownloader(DocumentDownloader):
    def _get_output_filename(self, url: str) -> str:
        return url.split("/")[-1]

    def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
        try:
            urllib.request.urlretrieve(url, path)
            return True, None
        except Exception as e:
            return False, str(e)

Using DocumentDownloadStage

from nemo_curator.stages.text.download.base.download import DocumentDownloadStage

downloader = HttpDownloader(download_dir="/data/downloads")
stage = DocumentDownloadStage(downloader=downloader)

# The stage processes FileGroupTask objects containing URLs
# and returns FileGroupTask objects containing local file paths

Known Implementations

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment