Implementation:NVIDIA NeMo Curator IDGenerator
| Knowledge Sources | |
|---|---|
| Domains | Deduplication, Distributed Computing, Data Curation |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Manages globally unique document ID assignment for deduplication pipelines using a Ray actor, ensuring every document gets a unique integer ID across distributed workers.
Description
This module provides the core ID assignment infrastructure for deduplication:
- IdGeneratorBase -- Base class (non-Ray) that implements sequential ID allocation. Key methods:
register_batch(files, count)-- Registers a batch of documents and returns the starting ID. Uses a UUID5 hash of file paths as a batch key to enable idempotent re-registration.hash_files(filepath)-- Computes a deterministic UUID5 hash from file paths usinguuid.NAMESPACE_URL.get_batch_range(files, key)-- Retrieves the (min_id, max_id) tuple for a previously registered batch.to_disk(filepath)/from_disk(filepath)-- Serializes/deserializes state (next_id, batch_registry) to JSON via fsspec.
- IdGenerator -- A
@ray.remoteactor subclass ofIdGeneratorBasewith detached lifetime. Adds await()method used to confirm actor readiness.
- Helper functions:
create_id_generator_actor(filepath=None)-- Creates a detached Ray actor, optionally restoring state from a JSON file. Initializes Ray, creates the actor, waits for it to be ready, then shuts down the local Ray connection.get_id_generator_actor()-- Retrieves the named actor handle.kill_id_generator_actor()-- Terminates the actor.write_id_generator_to_disk(filepath)-- Persists the actor state to disk.
- Constants:
CURATOR_DEDUP_ID_STR = "_curator_dedup_id"-- Column name for dedup IDs.CURATOR_ID_GENERATOR_ACTOR_NAME = "curator_deduplication_id_generator"-- Ray actor name and namespace.
Usage
Use this module to create and manage the ID generator actor before running deduplication pipelines. The actor provides a globally consistent ID counter accessible from any Ray worker, ensuring no duplicate IDs are assigned across distributed processing.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/stages/deduplication/id_generator.py
- Lines: 1-138
Signature
CURATOR_DEDUP_ID_STR = "_curator_dedup_id"
CURATOR_ID_GENERATOR_ACTOR_NAME = "curator_deduplication_id_generator"
class IdGeneratorBase:
def __init__(self, start_id: int = 0, batch_registry: dict[str, tuple[int, int]] | None = None): ...
def register_batch(self, files: str | list[str], count: int) -> int: ...
def hash_files(self, filepath: str | list[str]) -> str: ...
def get_batch_range(self, files: str | list[str] | None, key: str | None) -> tuple[int, int]: ...
def to_disk(self, filepath: str, storage_options: dict[str, Any] | None = None) -> None: ...
@classmethod
def from_disk(cls, filepath: str, storage_options: dict[str, Any] | None = None) -> "IdGeneratorBase": ...
@ray.remote
class IdGenerator(IdGeneratorBase):
def wait(self) -> None: ...
def get_id_generator_actor() -> ActorHandle[IdGenerator]: ...
def kill_id_generator_actor() -> None: ...
def create_id_generator_actor(filepath: str | None = None, storage_options: dict[str, Any] | None = None) -> None: ...
def write_id_generator_to_disk(filepath: str, storage_options: dict[str, Any] | None = None) -> None: ...
Import
from nemo_curator.stages.deduplication.id_generator import (
IdGeneratorBase,
IdGenerator,
create_id_generator_actor,
get_id_generator_actor,
kill_id_generator_actor,
write_id_generator_to_disk,
CURATOR_DEDUP_ID_STR,
)
I/O Contract
IdGeneratorBase Constructor
| Name | Type | Required | Description |
|---|---|---|---|
| start_id | int | No | Starting ID for sequential allocation (default: 0) |
| batch_registry | dict[str, tuple[int, int]] | No | Pre-existing batch registry mapping batch hashes to (min_id, max_id) tuples |
register_batch
| Name | Type | Required | Description |
|---|---|---|---|
| files | str or list[str] | Yes | File path(s) used to compute batch hash |
| count | int | Yes | Number of documents in the batch |
Outputs
| Name | Type | Description |
|---|---|---|
| start_id | int | The first ID assigned to this batch |
Serialization Format
The state is persisted as a JSON object with two fields:
{
"next_id": 1000,
"batch_registry": {
"uuid-hash-1": [0, 499],
"uuid-hash-2": [500, 999]
}
}
Usage Examples
Creating and Using the Actor
from nemo_curator.stages.deduplication.id_generator import (
create_id_generator_actor,
write_id_generator_to_disk,
kill_id_generator_actor,
)
# Create a new actor
create_id_generator_actor()
# After pipeline execution, persist state
write_id_generator_to_disk("/data/dedup/id_generator_state.json")
# Clean up
kill_id_generator_actor()
Restoring from Disk
from nemo_curator.stages.deduplication.id_generator import (
create_id_generator_actor,
)
# Resume from a previously saved state
create_id_generator_actor(filepath="/data/dedup/id_generator_state.json")
Direct Usage (Non-Ray)
from nemo_curator.stages.deduplication.id_generator import IdGeneratorBase
gen = IdGeneratorBase(start_id=0)
start = gen.register_batch(["file1.jsonl", "file2.jsonl"], count=500)
print(start) # 0
print(gen.get_batch_range(files=["file1.jsonl", "file2.jsonl"], key=None)) # (0, 499)