Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator Base DownloadStage

From Leeroopedia
Knowledge Sources
Domains Data Acquisition, Composite Stage, Pipeline Infrastructure
Last Updated 2026-02-14 00:00 GMT

Overview

DocumentDownloadExtractStage is a composite pipeline stage that orchestrates the full 3-step data acquisition pattern: URL generation, file download, and document iteration with optional extraction.

Description

The DocumentDownloadExtractStage is a dataclass extending CompositeStage[_EmptyTask, DocumentBatch]. It provides a high-level abstraction that assembles three sub-stages into a single pipeline component:

  1. URLGenerationStage: Generates download URLs from a URLGenerator with an optional url_limit parameter.
  2. DocumentDownloadStage: Downloads files from URLs using a DocumentDownloader.
  3. DocumentIterateExtractStage: Iterates through downloaded files using a DocumentIterator and optionally applies a DocumentExtractor.

The composite stage takes an _EmptyTask as input (since URL generation starts from nothing) and produces a DocumentBatch containing the extracted records. The decompose() method returns the three sub-stages for pipeline expansion by the executor.

The stage name is automatically generated as document_download_extract_{url_generator_class}_{downloader_class}_composite.

Usage

Use DocumentDownloadExtractStage (or subclass it) to define complete data acquisition pipelines for specific sources. All concrete download implementations (ArXiv, Common Crawl, Wikipedia) follow this pattern, providing their specific URL generator, downloader, iterator, and optional extractor.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/text/download/base/stage.py
  • Lines: 1-81

Signature

@dataclass
class DocumentDownloadExtractStage(CompositeStage[_EmptyTask, DocumentBatch]):
    """Composite stage that combines URL generation, download, and iterate-extract stages."""

    url_generator: URLGenerator
    downloader: DocumentDownloader
    iterator: DocumentIterator
    extractor: DocumentExtractor | None = None
    url_limit: int | None = None
    record_limit: int | None = None
    add_filename_column: bool | str = True

    def decompose(self) -> list[ProcessingStage]:
        ...

    def get_description(self) -> str:
        ...

Import

from nemo_curator.stages.text.download.base.stage import DocumentDownloadExtractStage

I/O Contract

Inputs

Name Type Required Description
url_generator URLGenerator Yes Generates the list of URLs to download
downloader DocumentDownloader Yes Handles downloading files from URLs
iterator DocumentIterator Yes Iterates through downloaded files to produce records
extractor None No Optionally transforms raw records into structured output (default: None)
url_limit None No Maximum number of URLs to process (default: None, all URLs)
record_limit None No Maximum number of records to extract per file (default: None, unlimited)
add_filename_column str No Whether to add a filename column to output records (default: True)

Outputs

Name Type Description
result DocumentBatch Batch containing a pandas DataFrame of extracted records

Pipeline Architecture

The stage decomposes into three sequential sub-stages:

_EmptyTask --> [URLGenerationStage] --> FileGroupTask (URLs)
                                            |
                                            v
FileGroupTask (URLs) --> [DocumentDownloadStage] --> FileGroupTask (local paths)
                                                          |
                                                          v
FileGroupTask (local paths) --> [DocumentIterateExtractStage] --> DocumentBatch

Each sub-stage can scale independently. The URLGenerationStage fans out to create one task per URL, enabling parallel downloads across the cluster.

Usage Examples

ArXiv Pipeline

from nemo_curator.stages.text.download.base.stage import DocumentDownloadExtractStage
from nemo_curator.stages.text.download.arxiv.url_generation import ArxivUrlGenerator
from nemo_curator.stages.text.download.arxiv.download import ArxivDownloader
from nemo_curator.stages.text.download.arxiv.iterator import ArxivIterator
from nemo_curator.stages.text.download.arxiv.extract import ArxivExtractor

pipeline = DocumentDownloadExtractStage(
    url_generator=ArxivUrlGenerator(),
    downloader=ArxivDownloader(download_dir="/data/arxiv/raw"),
    iterator=ArxivIterator(log_frequency=1000),
    extractor=ArxivExtractor(),
    url_limit=10,       # Process only 10 tar files
    record_limit=100,   # Extract up to 100 papers per tar
)

# Decompose into sub-stages for pipeline execution
stages = pipeline.decompose()
# stages = [URLGenerationStage, DocumentDownloadStage, DocumentIterateExtractStage]

Custom Pipeline Without Extractor

from nemo_curator.stages.text.download.base.stage import DocumentDownloadExtractStage

pipeline = DocumentDownloadExtractStage(
    url_generator=my_url_generator,
    downloader=my_downloader,
    iterator=my_iterator,
    # No extractor - iterator produces final format directly
)

description = pipeline.get_description()
# "URL-Download-Iterate-Extract pipeline using MyUrlGenerator and MyDownloader"

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment