Implementation:Datajuicer Data juicer Compress Utils
| Knowledge Sources | |
|---|---|
| Domains | Data Processing, Cache Management |
| Last Updated | 2026-02-14 16:00 GMT |
Overview
Comprehensive compression/decompression framework supporting multiple algorithms (Zstandard, LZ4, Gzip) for managing HuggingFace dataset cache files and reducing disk usage during data processing.
Description
The compress module provides a layered architecture for file compression:
Low-Level Components:
- FileLock -- Extends HuggingFace's
FileLockto automatically remove lock files after release. - Extractor -- Extends HuggingFace's
Extractorto extract compressed files with file locking for safe parallel access. - BaseCompressor -- Abstract base class defining the
compress(input_path, output_path)interface. - ZstdCompressor, Lz4Compressor, GzipCompressor -- Concrete compressors implementing Zstandard, LZ4, and Gzip algorithms respectively.
- Compressor -- Registry class mapping format names to compressor implementations, with thread-safe file locking.
Mid-Level Components:
- CompressManager -- Wraps compression and decompression operations for a given format, delegating to
CompressorandExtractor.
High-Level Components:
- CacheCompressManager -- Manages compression/decompression of HuggingFace Arrow cache files. It identifies cache files by fingerprint patterns, compresses previous dataset cache files after processing (while preserving active cache files), and supports parallel compression/decompression via multiprocessing pools.
- CompressionOff -- Context manager that temporarily disables cache compression globally.
Module-Level Convenience Functions:
compress(),decompress(),cleanup_compressed_cache_files()-- Thin wrappers that checkCACHE_COMPRESSconfiguration and delegate toCacheCompressManager.
Usage
Use this module when processing large datasets to reduce disk space consumption of intermediate Arrow cache files. The compression is typically configured at the framework level via cache_utils.CACHE_COMPRESS and automatically applied between operator pipeline stages.
Code Reference
Source Location
- Repository: Datajuicer_Data_juicer
- File:
data_juicer/utils/compress.py
Signature
class BaseCompressor(ABC):
@staticmethod
@abstractmethod
def compress(input_path, output_path): ...
class CompressManager:
def __init__(self, compressor_format: str = "zstd"): ...
def compress(self, input_path, output_path): ...
def decompress(self, input_path, output_path): ...
class CacheCompressManager:
def __init__(self, compressor_format: str = "zstd"): ...
def compress(self, prev_ds: Dataset, this_ds: Dataset = None,
num_proc: int = 1): ...
def decompress(self, ds: Dataset, fingerprints=None,
num_proc: int = 1): ...
def cleanup_cache_files(self, ds): ...
class CompressionOff:
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
def compress(prev_ds, this_ds=None, num_proc=1): ...
def decompress(ds, fingerprints=None, num_proc=1): ...
def cleanup_compressed_cache_files(ds): ...
Import
from data_juicer.utils.compress import (
CacheCompressManager, CompressManager, CompressionOff,
compress, decompress, cleanup_compressed_cache_files
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| compressor_format | str | No | Compression algorithm: "zstd", "lz4", or "gzip". Default "zstd". |
| prev_ds | Dataset | Yes | Previous dataset whose cache files should be compressed. |
| this_ds | Dataset | No | Current dataset to exclude its cache files from compression. None means compress all. |
| num_proc | int | No | Number of parallel processes for compression/decompression. Default 1. |
| fingerprints | Union[str, List[str]] | No | Cache file fingerprints to decompress. None decompresses all matching files. |
Outputs
| Name | Type | Description |
|---|---|---|
| (side effect) | None | Compressed/decompressed files are written to disk alongside original cache files. |
| cleanup count | int | Number of compressed files cleaned up (from cleanup_cache_files).
|
Usage Examples
from data_juicer.utils.compress import (
CacheCompressManager, CompressionOff, compress
)
# Compress previous dataset cache files after an operator finishes
compress(prev_ds=previous_dataset, this_ds=current_dataset, num_proc=4)
# Temporarily disable compression
with CompressionOff():
# Operations here won't compress cache files
result = dataset.map(some_function)
# Direct compression with CompressManager
manager = CacheCompressManager(compressor_format="zstd")
manager.compress(prev_ds=old_ds, this_ds=new_ds, num_proc=2)
Related Pages
- Datajuicer_Data_juicer_File_Utils -- File utility functions used alongside compression