Implementation:NVIDIA NeMo Curator Base DownloadStage
| Knowledge Sources | |
|---|---|
| Domains | Data Acquisition, Composite Stage, Pipeline Infrastructure |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
DocumentDownloadExtractStage is a composite pipeline stage that orchestrates the full 3-step data acquisition pattern: URL generation, file download, and document iteration with optional extraction.
Description
The DocumentDownloadExtractStage is a dataclass extending CompositeStage[_EmptyTask, DocumentBatch]. It provides a high-level abstraction that assembles three sub-stages into a single pipeline component:
- URLGenerationStage: Generates download URLs from a
URLGeneratorwith an optionalurl_limitparameter. - DocumentDownloadStage: Downloads files from URLs using a
DocumentDownloader. - DocumentIterateExtractStage: Iterates through downloaded files using a
DocumentIteratorand optionally applies aDocumentExtractor.
The composite stage takes an _EmptyTask as input (since URL generation starts from nothing) and produces a DocumentBatch containing the extracted records. The decompose() method returns the three sub-stages for pipeline expansion by the executor.
The stage name is automatically generated as document_download_extract_{url_generator_class}_{downloader_class}_composite.
Usage
Use DocumentDownloadExtractStage (or subclass it) to define complete data acquisition pipelines for specific sources. All concrete download implementations (ArXiv, Common Crawl, Wikipedia) follow this pattern, providing their specific URL generator, downloader, iterator, and optional extractor.
Code Reference
Source Location
- Repository: NeMo-Curator
- File:
nemo_curator/stages/text/download/base/stage.py - Lines: 1-81
Signature
@dataclass
class DocumentDownloadExtractStage(CompositeStage[_EmptyTask, DocumentBatch]):
"""Composite stage that combines URL generation, download, and iterate-extract stages."""
url_generator: URLGenerator
downloader: DocumentDownloader
iterator: DocumentIterator
extractor: DocumentExtractor | None = None
url_limit: int | None = None
record_limit: int | None = None
add_filename_column: bool | str = True
def decompose(self) -> list[ProcessingStage]:
...
def get_description(self) -> str:
...
Import
from nemo_curator.stages.text.download.base.stage import DocumentDownloadExtractStage
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| url_generator | URLGenerator |
Yes | Generates the list of URLs to download |
| downloader | DocumentDownloader |
Yes | Handles downloading files from URLs |
| iterator | DocumentIterator |
Yes | Iterates through downloaded files to produce records |
| extractor | None | No | Optionally transforms raw records into structured output (default: None) |
| url_limit | None | No | Maximum number of URLs to process (default: None, all URLs) |
| record_limit | None | No | Maximum number of records to extract per file (default: None, unlimited) |
| add_filename_column | str | No | Whether to add a filename column to output records (default: True) |
Outputs
| Name | Type | Description |
|---|---|---|
| result | DocumentBatch |
Batch containing a pandas DataFrame of extracted records |
Pipeline Architecture
The stage decomposes into three sequential sub-stages:
_EmptyTask --> [URLGenerationStage] --> FileGroupTask (URLs)
|
v
FileGroupTask (URLs) --> [DocumentDownloadStage] --> FileGroupTask (local paths)
|
v
FileGroupTask (local paths) --> [DocumentIterateExtractStage] --> DocumentBatch
Each sub-stage can scale independently. The URLGenerationStage fans out to create one task per URL, enabling parallel downloads across the cluster.
Usage Examples
ArXiv Pipeline
from nemo_curator.stages.text.download.base.stage import DocumentDownloadExtractStage
from nemo_curator.stages.text.download.arxiv.url_generation import ArxivUrlGenerator
from nemo_curator.stages.text.download.arxiv.download import ArxivDownloader
from nemo_curator.stages.text.download.arxiv.iterator import ArxivIterator
from nemo_curator.stages.text.download.arxiv.extract import ArxivExtractor
pipeline = DocumentDownloadExtractStage(
url_generator=ArxivUrlGenerator(),
downloader=ArxivDownloader(download_dir="/data/arxiv/raw"),
iterator=ArxivIterator(log_frequency=1000),
extractor=ArxivExtractor(),
url_limit=10, # Process only 10 tar files
record_limit=100, # Extract up to 100 papers per tar
)
# Decompose into sub-stages for pipeline execution
stages = pipeline.decompose()
# stages = [URLGenerationStage, DocumentDownloadStage, DocumentIterateExtractStage]
Custom Pipeline Without Extractor
from nemo_curator.stages.text.download.base.stage import DocumentDownloadExtractStage
pipeline = DocumentDownloadExtractStage(
url_generator=my_url_generator,
downloader=my_downloader,
iterator=my_iterator,
# No extractor - iterator produces final format directly
)
description = pipeline.get_description()
# "URL-Download-Iterate-Extract pipeline using MyUrlGenerator and MyDownloader"