Implementation:Apache Paimon ApplyDeletionVectorReader
| Knowledge Sources | |
|---|---|
| Domains | Data Reading, Deletion Vectors |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
ApplyDeletionVectorReader wraps record readers to automatically filter out deleted rows using deletion vectors, providing transparent handling of deleted records during data reads.
Description
The ApplyDeletionVectorReader class implements a decorator pattern around RecordReader, intercepting read operations to apply deletion vector filtering. It maintains references to both the underlying reader and a DeletionVector, using the deletion vector's is_deleted() method to skip rows marked for deletion during iteration.
The class supports both batch reading through read_batch() which returns an ApplyDeletionRecordIterator, and Arrow batch reading through read_arrow_batch() which filters PyArrow RecordBatches using RoaringBitmap operations. For Arrow batches, it calculates the intersection between the valid row range and the inverse of the deletion bitmap, then uses PyArrow's take() method to extract only non-deleted rows.
ApplyDeletionRecordIterator provides the actual record-by-record filtering logic. Its next() method continuously pulls records from the underlying iterator, checking each position against the deletion vector and returning only non-deleted records. This approach enables lazy, streaming evaluation of deletion filters without loading entire batches into memory. The reader properly delegates close() operations to ensure resource cleanup.
Usage
Use ApplyDeletionVectorReader when implementing table scans that need to respect deletion vectors, wrapping data file readers to automatically filter deleted rows, or building merge-on-read systems with deletion tracking.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py
Signature
class ApplyDeletionVectorReader(RecordBatchReader):
"""
A RecordReader which applies DeletionVector to filter records.
"""
def __init__(self, reader: RecordReader, deletion_vector: DeletionVector):
pass
def reader(self) -> RecordReader:
pass
def deletion_vector(self) -> DeletionVector:
pass
def read_arrow_batch(self) -> Optional[RecordBatch]:
pass
def read_batch(self) -> Optional[RecordIterator]:
pass
def close(self):
pass
class ApplyDeletionRecordIterator(RecordIterator):
"""
A RecordIterator that wraps another RecordIterator and applies a DeletionVector
to filter out deleted records.
"""
def __init__(self, iterator: RecordIterator, deletion_vector: DeletionVector):
pass
def iterator(self) -> RecordIterator:
pass
def deletion_vector(self) -> DeletionVector:
pass
def returned_position(self) -> int:
pass
def next(self) -> Optional[object]:
pass
Import
from pypaimon.deletionvectors.apply_deletion_vector_reader import (
ApplyDeletionVectorReader, ApplyDeletionRecordIterator
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| reader | RecordReader | Yes | Underlying record reader to wrap |
| deletion_vector | DeletionVector | Yes | Deletion vector for filtering |
| iterator | RecordIterator | Yes | Underlying record iterator to wrap |
Outputs
| Name | Type | Description |
|---|---|---|
| batch | RecordBatch | Arrow batch with deleted rows removed |
| iterator | RecordIterator | Iterator that skips deleted records |
| record | object | Next non-deleted record, or None if exhausted |
Usage Examples
from pypaimon.deletionvectors.apply_deletion_vector_reader import ApplyDeletionVectorReader
from pypaimon.deletionvectors.bitmap_deletion_vector import BitmapDeletionVector
from pypaimon.read.reader.iface.record_reader import RecordReader
# Create deletion vector marking some positions as deleted
dv = BitmapDeletionVector()
dv.delete(5) # Mark position 5 as deleted
dv.delete(10) # Mark position 10 as deleted
dv.delete(15) # Mark position 15 as deleted
# Wrap a record reader with deletion vector filtering
base_reader = ... # Your RecordReader implementation
filtered_reader = ApplyDeletionVectorReader(base_reader, dv)
# Read batches with automatic filtering
while True:
batch = filtered_reader.read_batch()
if batch is None:
break
# Iterate through non-deleted records
while True:
record = batch.next()
if record is None:
break
# Process record (positions 5, 10, 15 automatically skipped)
process_record(record)
# Close when done
filtered_reader.close()
# Read Arrow batches with filtering
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
base_reader: RecordBatchReader = ...
filtered_reader = ApplyDeletionVectorReader(base_reader, dv)
while True:
arrow_batch = filtered_reader.read_arrow_batch()
if arrow_batch is None:
break
# Arrow batch has deleted rows removed
print(f"Batch with {arrow_batch.num_rows} rows (deleted rows excluded)")
# Process arrow_batch...
# Manually use iterator wrapper
from pypaimon.read.reader.iface.record_iterator import RecordIterator
base_iterator: RecordIterator = ...
filtered_iterator = ApplyDeletionRecordIterator(base_iterator, dv)
# Iterate through records, automatically skipping deleted ones
record_count = 0
while True:
record = filtered_iterator.next()
if record is None:
break
record_count += 1
print(f"Record {record_count}: {record}")
# Access underlying components
reader = filtered_reader.reader() # Get wrapped reader
dv = filtered_reader.deletion_vector() # Get deletion vector
print(f"Deletion vector has {dv.get_cardinality()} deleted positions")
# Example with complex deletion patterns
dv = BitmapDeletionVector()
# Delete every 10th row in a 100-row file
for i in range(0, 100, 10):
dv.delete(i)
reader = ApplyDeletionVectorReader(base_reader, dv)
filtered_batch = reader.read_batch()
# Only 90 records will be returned (10 deleted)
count = 0
while True:
record = filtered_batch.next()
if record is None:
break
count += 1
print(f"Read {count} non-deleted records") # 90
# Combining with file reading
from pypaimon.common.file_io import FileIO
file_io = FileIO.get("/path/to/data")
deletion_file = ... # DeletionFile metadata
# Load deletion vector from file
from pypaimon.deletionvectors.deletion_vector import DeletionVector
dv = DeletionVector.read(file_io, deletion_file)
# Create reader for data file
data_reader = create_parquet_reader("/path/to/data/file.parquet")
# Wrap with deletion vector filtering
filtered_reader = ApplyDeletionVectorReader(data_reader, dv)
# Read with deletions applied
for batch in read_all_batches(filtered_reader):
process_batch(batch)
filtered_reader.close()