Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator Dedup IO Utils

From Leeroopedia
Knowledge Sources
Domains Deduplication, GPU Computing, Data Curation
Last Updated 2026-02-14 00:00 GMT

Overview

Provides DeduplicationIO, a mixin class with I/O utilities for GPU-based deduplication stages using cuDF (GPU DataFrames) with optional automatic document ID assignment.

Description

DeduplicationIO is a shared I/O layer for GPU-based deduplication stages. It wraps cuDF read/write operations with optional automatic ID assignment via the Ray IdGenerator actor. Key methods:

  • read_jsonl(filepath, columns, assign_id) -- Reads JSONL files using cudf.read_json() with lines=True. Optionally selects specific columns and assigns dedup IDs.
  • read_parquet(filepath, assign_id) -- Reads Parquet files using cudf.read_parquet() with allow_mismatched_pq_schemas=True. Optionally assigns dedup IDs.
  • write_parquet(df, filepath) -- Writes a cuDF DataFrame to Parquet. Creates parent directories via fsspec before writing.
  • custom_read(filepath, read_func, assign_id) -- Reads files using a user-supplied callable, optionally assigning dedup IDs.
  • assign_id(filepath, df) -- The core ID assignment method. Checks if the _curator_dedup_id column already exists. If not, it contacts the Ray IdGenerator actor via register_batch.remote() to obtain a starting ID, then assigns sequential integer IDs using np.arange().

Usage

Use DeduplicationIO as a mixin (via multiple inheritance) in deduplication stage classes that need GPU-accelerated file I/O with consistent document ID assignment. It abstracts the complexity of cuDF read/write operations and ID generation behind a simple interface.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/deduplication/io_utils.py
  • Lines: 1-72

Signature

class DeduplicationIO:
    def __init__(self, id_generator: "IdGenerator | None", **kwargs): ...

    def read_jsonl(
        self, filepath: str | list[str], columns: list[str] | None = None,
        assign_id: bool = False, **kwargs
    ) -> "cudf.DataFrame": ...

    def read_parquet(
        self, filepath: str | list[str], assign_id: bool = False, **kwargs
    ) -> "cudf.DataFrame": ...

    def write_parquet(
        self, df: "cudf.DataFrame", filepath: str, **kwargs
    ) -> None: ...

    def custom_read(
        self, filepath: str | list[str], read_func: Callable,
        assign_id: bool = False, **kwargs
    ) -> "cudf.DataFrame": ...

    def assign_id(
        self, filepath: str | list[str], df: "cudf.DataFrame"
    ) -> "cudf.DataFrame": ...

Import

from nemo_curator.stages.deduplication.io_utils import DeduplicationIO

I/O Contract

Constructor Inputs

Name Type Required Description
id_generator IdGenerator or None Yes Ray actor handle for the ID generator. Required when assign_id=True and data lacks the _curator_dedup_id column

read_jsonl Inputs

Name Type Required Description
filepath str or list[str] Yes Path(s) to JSONL file(s) to read
columns list[str] No Specific columns to select (default: None, reads all)
assign_id bool No Whether to assign dedup IDs (default: False)

read_parquet Inputs

Name Type Required Description
filepath str or list[str] Yes Path(s) to Parquet file(s) to read
assign_id bool No Whether to assign dedup IDs (default: False)

Outputs

Name Type Description
df cudf.DataFrame GPU DataFrame with file data, optionally including a _curator_dedup_id column with sequential integer IDs

Usage Examples

Basic Usage as Mixin

from nemo_curator.stages.deduplication.io_utils import DeduplicationIO

class MyDedupStage(DeduplicationIO, SomeBaseClass):
    def __init__(self, id_generator):
        super().__init__(id_generator=id_generator)

    def run(self, filepath):
        # Read Parquet with automatic ID assignment
        df = self.read_parquet(filepath, assign_id=True)
        # Process...
        self.write_parquet(df, "/output/result.parquet")

Reading JSONL with Column Selection

from nemo_curator.stages.deduplication.io_utils import DeduplicationIO

io = DeduplicationIO(id_generator=my_id_gen_actor)
df = io.read_jsonl(
    "/data/input.jsonl",
    columns=["text", "url"],
    assign_id=True,
)
# df now has columns: text, url, _curator_dedup_id

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment