Implementation:Apache Paimon DeltaVarintCompressor
| Knowledge Sources | |
|---|---|
| Domains | Data Compression, Encoding |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
DeltaVarintCompressor provides efficient compression for sorted integer lists using delta encoding combined with variable-length integer (varint) encoding with zigzag transformation.
Description
The DeltaVarintCompressor class implements a space-efficient compression algorithm particularly suited for sorted integer sequences. It combines three techniques: delta encoding (storing differences between consecutive values), zigzag encoding (mapping signed integers to unsigned for better compression of negative values), and varint encoding (using variable-length byte sequences where smaller values use fewer bytes).
The compression process first encodes the initial value directly, then stores only the differences (deltas) between consecutive values. Each integer undergoes zigzag encoding to handle negative deltas efficiently, followed by varint encoding where each byte stores 7 bits of data plus a continuation bit. This approach dramatically reduces storage requirements for sorted sequences with small differences between values.
Decompression reverses the process, reading varint-encoded deltas from the byte stream, applying zigzag decoding, and reconstructing original values by accumulating deltas. The implementation includes robust error handling for truncated data and varint overflow conditions. This compression scheme is particularly effective for data like sorted timestamps, IDs, or offsets where values are naturally ordered.
Usage
Use DeltaVarintCompressor when compressing sorted integer sequences like bitmap indices, deletion vectors, or timestamp lists where values have small differences and efficient storage is critical.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/common/delta_varint_compressor.py
Signature
class DeltaVarintCompressor:
@staticmethod
def compress(data: List[int]) -> bytes:
pass
@staticmethod
def decompress(compressed: bytes) -> List[int]:
pass
@staticmethod
def _encode_varint(value: int, out: io.BytesIO) -> None:
pass
@staticmethod
def _decode_varint(in_stream: io.BytesIO) -> int:
pass
Import
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | List[int] | Yes | List of integers to compress (ideally sorted) |
| compressed | bytes | Yes | Compressed byte data to decompress |
Outputs
| Name | Type | Description |
|---|---|---|
| compressed | bytes | Compressed byte representation |
| data | List[int] | Decompressed list of integers |
Usage Examples
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
# Compress a sorted list of integers
data = [100, 105, 108, 112, 120, 125]
compressed = DeltaVarintCompressor.compress(data)
print(f"Original size: {len(data) * 8} bytes") # Assuming 8 bytes per int
print(f"Compressed size: {len(compressed)} bytes")
# Decompress back to original data
decompressed = DeltaVarintCompressor.decompress(compressed)
assert decompressed == data
print(f"Decompressed: {decompressed}")
# Works well with timestamps
timestamps = [1640000000, 1640000001, 1640000003, 1640000006, 1640000010]
compressed_ts = DeltaVarintCompressor.compress(timestamps)
print(f"Timestamp compression ratio: {(len(timestamps) * 8) / len(compressed_ts):.2f}x")
# Handle empty lists
empty = []
compressed_empty = DeltaVarintCompressor.compress(empty)
assert compressed_empty == b''
assert DeltaVarintCompressor.decompress(compressed_empty) == []
# Compression is most effective with sorted data with small deltas
sorted_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
compressed_sorted = DeltaVarintCompressor.compress(sorted_data)
unsorted_data = [1, 100, 2, 200, 3, 300, 4, 400, 5, 500]
compressed_unsorted = DeltaVarintCompressor.compress(unsorted_data)
print(f"Sorted compression: {len(compressed_sorted)} bytes")
print(f"Unsorted compression: {len(compressed_unsorted)} bytes")
# Handle negative numbers (zigzag encoding)
mixed_data = [0, -5, 10, -15, 20, -25]
compressed_mixed = DeltaVarintCompressor.compress(mixed_data)
decompressed_mixed = DeltaVarintCompressor.decompress(compressed_mixed)
assert decompressed_mixed == mixed_data