Overview
Provides DeduplicationIO, a mixin class with I/O utilities for GPU-based deduplication stages using cuDF (GPU DataFrames) with optional automatic document ID assignment.
Description
DeduplicationIO is a shared I/O layer for GPU-based deduplication stages. It wraps cuDF read/write operations with optional automatic ID assignment via the Ray IdGenerator actor. Key methods:
- read_jsonl(filepath, columns, assign_id) -- Reads JSONL files using
cudf.read_json() with lines=True. Optionally selects specific columns and assigns dedup IDs.
- read_parquet(filepath, assign_id) -- Reads Parquet files using
cudf.read_parquet() with allow_mismatched_pq_schemas=True. Optionally assigns dedup IDs.
- write_parquet(df, filepath) -- Writes a cuDF DataFrame to Parquet. Creates parent directories via fsspec before writing.
- custom_read(filepath, read_func, assign_id) -- Reads files using a user-supplied callable, optionally assigning dedup IDs.
- assign_id(filepath, df) -- The core ID assignment method. Checks if the
_curator_dedup_id column already exists. If not, it contacts the Ray IdGenerator actor via register_batch.remote() to obtain a starting ID, then assigns sequential integer IDs using np.arange().
Usage
Use DeduplicationIO as a mixin (via multiple inheritance) in deduplication stage classes that need GPU-accelerated file I/O with consistent document ID assignment. It abstracts the complexity of cuDF read/write operations and ID generation behind a simple interface.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/stages/deduplication/io_utils.py
- Lines: 1-72
Signature
class DeduplicationIO:
def __init__(self, id_generator: "IdGenerator | None", **kwargs): ...
def read_jsonl(
self, filepath: str | list[str], columns: list[str] | None = None,
assign_id: bool = False, **kwargs
) -> "cudf.DataFrame": ...
def read_parquet(
self, filepath: str | list[str], assign_id: bool = False, **kwargs
) -> "cudf.DataFrame": ...
def write_parquet(
self, df: "cudf.DataFrame", filepath: str, **kwargs
) -> None: ...
def custom_read(
self, filepath: str | list[str], read_func: Callable,
assign_id: bool = False, **kwargs
) -> "cudf.DataFrame": ...
def assign_id(
self, filepath: str | list[str], df: "cudf.DataFrame"
) -> "cudf.DataFrame": ...
Import
from nemo_curator.stages.deduplication.io_utils import DeduplicationIO
I/O Contract
Constructor Inputs
| Name |
Type |
Required |
Description
|
| id_generator |
IdGenerator or None |
Yes |
Ray actor handle for the ID generator. Required when assign_id=True and data lacks the _curator_dedup_id column
|
read_jsonl Inputs
| Name |
Type |
Required |
Description
|
| filepath |
str or list[str] |
Yes |
Path(s) to JSONL file(s) to read
|
| columns |
list[str] |
No |
Specific columns to select (default: None, reads all)
|
| assign_id |
bool |
No |
Whether to assign dedup IDs (default: False)
|
read_parquet Inputs
| Name |
Type |
Required |
Description
|
| filepath |
str or list[str] |
Yes |
Path(s) to Parquet file(s) to read
|
| assign_id |
bool |
No |
Whether to assign dedup IDs (default: False)
|
Outputs
| Name |
Type |
Description
|
| df |
cudf.DataFrame |
GPU DataFrame with file data, optionally including a _curator_dedup_id column with sequential integer IDs
|
Usage Examples
Basic Usage as Mixin
from nemo_curator.stages.deduplication.io_utils import DeduplicationIO
class MyDedupStage(DeduplicationIO, SomeBaseClass):
def __init__(self, id_generator):
super().__init__(id_generator=id_generator)
def run(self, filepath):
# Read Parquet with automatic ID assignment
df = self.read_parquet(filepath, assign_id=True)
# Process...
self.write_parquet(df, "/output/result.parquet")
Reading JSONL with Column Selection
from nemo_curator.stages.deduplication.io_utils import DeduplicationIO
io = DeduplicationIO(id_generator=my_id_gen_actor)
df = io.read_jsonl(
"/data/input.jsonl",
columns=["text", "url"],
assign_id=True,
)
# df now has columns: text, url, _curator_dedup_id
Related Pages