Implementation:Apache Paimon PushDownUtils
| Knowledge Sources | |
|---|---|
| Domains | Query Optimization, Predicate Pushdown |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
PushDownUtils provides utilities for predicate manipulation and transformation to support partition pruning and filter pushdown.
Description
PushDownUtils is a module containing utility functions for manipulating predicates in query optimization. It provides three key capabilities: extracting partition specifications from predicates, trimming predicates to only relevant fields, and transforming field indices in predicates.
The extract_partition_spec_from_predicate function analyzes a predicate to determine if it specifies exact values for all partition keys. This enables partition pruning by identifying scans that target specific partitions. The function splits AND predicates and looks for equality conditions on partition fields.
The trim_predicate_by_fields function removes parts of a predicate that reference fields not in a specified list. This is useful when pushing predicates down to different levels of the query plan where only certain fields are available. The trim_and_transform_predicate function combines trimming with field index transformation.
Field index transformation is necessary when a predicate defined on table-level fields needs to be applied at the file level, where field ordering may differ due to schema evolution. The _change_index helper recursively walks the predicate tree and updates field indices according to a mapping.
Usage
Use PushDownUtils when implementing query optimization in table scans. The functions are called by the scan planner to extract partition information, trim predicates for partition-level filtering, and transform predicates for file-level pushdown to format readers.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/push_down_utils.py
Signature
def extract_partition_spec_from_predicate(
predicate: Predicate, partition_keys: List[str]
) -> Optional[Dict[str, str]]:
"""Extract partition specification if predicate specifies all partition keys."""
...
def trim_and_transform_predicate(
input_predicate: Predicate, all_fields: List[str], trimmed_keys: List[str]
):
"""Trim predicate to specified fields and transform field indices."""
...
def trim_predicate_by_fields(
input_predicate: Predicate, trimmed_keys: List[str]
):
"""Remove predicate parts referencing fields not in trimmed_keys."""
...
Import
from pypaimon.read.push_down_utils import (
extract_partition_spec_from_predicate,
trim_and_transform_predicate,
trim_predicate_by_fields
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| predicate | Predicate | Yes | Input predicate to process |
| partition_keys | List[str] | Yes (extract) | List of partition key field names |
| all_fields | List[str] | Yes (transform) | Complete list of table field names |
| trimmed_keys | List[str] | Yes | List of fields to keep in predicate |
Outputs
| Name | Type | Description |
|---|---|---|
| partition_spec | Optional[Dict[str, str]] | Partition field name to value mapping, or None |
| trimmed_predicate | Predicate | Predicate with only relevant fields |
| transformed_predicate | Predicate | Predicate with updated field indices |
Usage Examples
from pypaimon.read.push_down_utils import (
extract_partition_spec_from_predicate,
trim_and_transform_predicate
)
from pypaimon.common.predicate_builder import PredicateBuilder
# Build a predicate: year=2024 AND month=12 AND value > 100
predicate = PredicateBuilder.and_predicates([
PredicateBuilder.equal("year", 2024),
PredicateBuilder.equal("month", 12),
PredicateBuilder.greater_than("value", 100)
])
# Extract partition specification
partition_keys = ["year", "month"]
partition_spec = extract_partition_spec_from_predicate(predicate, partition_keys)
if partition_spec:
print(f"Scanning partition: {partition_spec}")
# Output: {'year': '2024', 'month': '12'}
# Trim predicate to data fields only
data_fields = ["value", "name"]
trimmed = trim_predicate_by_fields(predicate, data_fields)
# Result: value > 100 (partition fields removed)
# Transform field indices for file-level pushdown
table_fields = ["year", "month", "value", "name"]
file_fields = ["value", "name"] # File doesn't have partition columns
file_predicate = trim_and_transform_predicate(predicate, table_fields, file_fields)
# Result: predicate with field index 2 (value in table) -> 0 (value in file)