Implementation:Huggingface Datatrove MinhashDedupSignature
| Knowledge Sources | |
|---|---|
| Domains | |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete pipeline step that computes MinHash signatures for each document in a DocumentsPipeline and writes sorted binary signature files to disk. This is the first of four stages in the Datatrove MinHash deduplication pipeline. Each document is tokenized into word n-grams, hashed through multiple permutation functions, and the resulting signatures are partitioned into LSH buckets and sorted for efficient downstream matching.
Description
MinhashDedupSignature extends PipelineStep and performs the following:
- Text normalization and tokenization: Each document's text is simplified (lowercased, whitespace-normalized, punctuation removed via
simplify_text) and tokenized using a language-specific word tokenizer. - Shingling: Consecutive word tokens are grouped into n-grams of size
n_grams(default 5). Each n-gram is hashed into auint64value using the configured hash function. - Signature computation: All shingle hashes are transformed via 112 random permutation functions (parameterized by
aandbcoefficients drawn from a seeded RNG). For each function, the minimum hash across all shingles is taken. The 112 values are split into 14 bands of 8 hashes each. - Binary output: Each band's signatures are written to
bucket_{bi:03d}/{rank:05d}.minhash.sigfiles in packed binary format. - Sorting: After all documents are processed, each bucket file is read into a NumPy structured array, sorted by signature fields, and written back. A Blake2b checksum verification ensures data integrity after writing.
The class supports skip mode (skip_existing_sigs=True) which checks for pre-existing valid signature files and avoids recomputation.
Usage
from datatrove.pipeline.dedup import MinhashDedupSignature, MinhashConfig
# With default configuration (5-grams, 14 buckets, 8 hashes/bucket)
sig_step = MinhashDedupSignature(
output_folder="s3://my-bucket/minhash/sigs",
)
# With custom configuration
config = MinhashConfig(
n_grams=5,
num_buckets=14,
hashes_per_bucket=8,
seed=1,
)
sig_step = MinhashDedupSignature(
output_folder="s3://my-bucket/minhash/sigs",
config=config,
language="en",
skip_existing_sigs=False,
)
Code Reference
Source Location
- Repository:
huggingface/datatrove - File:
src/datatrove/pipeline/dedup/minhash.py(lines 124--322)
Signature
class MinhashDedupSignature(PipelineStep):
def __init__(
self,
output_folder: DataFolderLike,
config: MinhashConfig = None,
language: str = Languages.english,
skip_existing_sigs: bool = False,
):
Configuration dataclass:
@dataclass
class MinhashConfig:
n_grams: int = 5
num_buckets: int = 14
hashes_per_bucket: int = 8
seed: int = 1
norm_config: TextNormConfig = field(default_factory=TextNormConfig)
hash_config: HashConfig = field(default_factory=HashConfig)
Import
from datatrove.pipeline.dedup import MinhashDedupSignature, MinhashConfig
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
data |
DocumentsPipeline | Iterator of Document objects. Each document must have a .text attribute containing the raw text to be shingled and hashed.
|
rank |
int | Worker rank identifier, used to name output files (e.g., 00000.minhash.sig).
|
world_size |
int | Total number of parallel workers. |
Outputs
| Name | Type | Description |
|---|---|---|
Binary .minhash.sig files |
Binary files | One file per bucket per rank, stored at bucket_{bi:03d}/{rank:05d}.minhash.sig. Each record contains hashes_per_bucket hash values followed by a 32-bit unsigned document index. Records are sorted by hash signature for efficient merge-based matching.
|
Usage Examples
Example: Full Pipeline with Local Executor
from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.dedup import MinhashDedupSignature, MinhashConfig
config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)
executor = LocalPipelineExecutor(
pipeline=[
JsonlReader("data/input/"),
MinhashDedupSignature(
output_folder="data/minhash/sigs",
config=config,
language="en",
),
],
tasks=8,
)
executor.run()
- Each of the 8 tasks processes a shard and produces 14 sorted
.minhash.sigfiles (one per bucket). - Output naming:
data/minhash/sigs/bucket_000/00000.minhash.sigthroughbucket_013/00007.minhash.sig.