Implementation:Huggingface Datatrove MinhashDedupFilter
Appearance
| Knowledge Sources | |
|---|---|
| Domains | |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete pipeline step that filters out duplicate documents from the document stream using pre-computed removal lists. This is the fourth and final stage of the Datatrove MinHash deduplication pipeline. It reads .remove files produced by the clustering stage and yields only documents whose sequential index does not appear in the removal list, keeping exactly one representative per duplicate cluster.
Description
MinhashDedupFilter extends PipelineStep and implements a streaming filter:
- Load removal list: Opens
{rank:06d}.removefrominput_folderand reads document IDs one at a time using sequentialstruct.unpackcalls. - Stream and filter: Iterates through the input
DocumentsPipeline. For each document, compares its sequential index against the next ID in the removal list. Matching documents are dropped (counted asStatHints.dropped); non-matching documents are yielded (counted asStatHints.forwarded). - Exclusion writing: If an
exclusion_writeris provided, dropped documents are written to it instead of being silently discarded. - Metadata loading: If
load_cluster_ids=True, cluster IDs are loaded from.clustersfiles and attached todoc.metadata["minhash_cluster_id"]. Documents not in any cluster receive a cluster ID of-1. Similarly,load_cluster_sizes=Trueloads cluster sizes intodoc.metadata["minhash_cluster_size"](defaulting to1for unclustered documents). - Missing file handling: If no
.removefile exists for the current rank, all documents are passed through with a warning.
Usage
from datatrove.pipeline.dedup import MinhashDedupFilter
filter_step = MinhashDedupFilter(
input_folder="s3://my-bucket/minhash/remove",
)
# With exclusion writing and metadata
from datatrove.pipeline.writers import JsonlWriter
filter_step = MinhashDedupFilter(
input_folder="s3://my-bucket/minhash/remove",
exclusion_writer=JsonlWriter("s3://my-bucket/minhash/excluded"),
load_cluster_ids=True,
load_cluster_sizes=True,
)
Code Reference
Source Location
- Repository:
huggingface/datatrove - File:
src/datatrove/pipeline/dedup/minhash.py(lines 599--688)
Signature
class MinhashDedupFilter(PipelineStep):
def __init__(
self,
input_folder: DataFolderLike,
exclusion_writer: DiskWriter = None,
load_cluster_ids: bool = False,
load_cluster_sizes: bool = False,
lines_to_buffer: int = 5,
):
Import
from datatrove.pipeline.dedup import MinhashDedupFilter
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
data |
DocumentsPipeline | Iterator of Document objects in the same order as the original signature computation. Sequential document indices must match the IDs used during clustering.
|
Binary .remove files |
Binary files | Read from input_folder/{rank:06d}.remove. Each record is a packed <I (32-bit unsigned) document ID to remove.
|
Binary .clusters files |
Binary files (optional) | Read from input_folder/{rank:06d}.clusters when load_cluster_ids=True. Each record contains (doc_id, cluster_id).
|
Binary .sizes files |
Binary files (optional) | Read from input_folder/{rank:06d}.sizes when load_cluster_sizes=True. Each record contains (doc_id, cluster_size).
|
rank |
int | Worker rank, used to determine which .remove file to load.
|
Outputs
| Name | Type | Description |
|---|---|---|
| Filtered DocumentsPipeline | Generator[Document] | Yields documents not found in the removal list. Each cluster retains exactly one document (the cluster root). Documents may have minhash_cluster_id and minhash_cluster_size metadata attached.
|
| Excluded documents (optional) | Written via DiskWriter | When exclusion_writer is provided, removed documents are written to the exclusion output for inspection and analysis.
|
Usage Examples
Example: Complete 4-Stage Pipeline
from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.writers import JsonlWriter
from datatrove.pipeline.dedup import (
MinhashDedupSignature,
MinhashDedupBuckets,
MinhashDedupCluster,
MinhashDedupFilter,
MinhashConfig,
)
config = MinhashConfig(n_grams=5, num_buckets=14, hashes_per_bucket=8)
# Stage 1: Compute signatures (parallel)
LocalPipelineExecutor(
pipeline=[
JsonlReader("data/input/"),
MinhashDedupSignature(output_folder="data/sigs", config=config),
],
tasks=8,
).run()
# Stage 2: Find duplicate pairs (parallel, 1+ worker per bucket)
LocalPipelineExecutor(
pipeline=[
MinhashDedupBuckets(
input_folder="data/sigs",
output_folder="data/dups",
config=config,
),
],
tasks=14,
).run()
# Stage 3: Cluster duplicates (single worker)
LocalPipelineExecutor(
pipeline=[
MinhashDedupCluster(
input_folder="data/dups",
output_folder="data/remove",
config=config,
),
],
tasks=1,
).run()
# Stage 4: Filter documents (parallel)
LocalPipelineExecutor(
pipeline=[
JsonlReader("data/input/"),
MinhashDedupFilter(input_folder="data/remove"),
JsonlWriter("data/output/"),
],
tasks=8,
).run()
- Stage 4 re-reads the original input and filters based on the
.removefiles. - Documents are yielded in their original order, minus the duplicates.
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment