Implementation:Apache Paimon TableRead To Ray
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Distributed_Computing |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for converting Paimon table splits into distributed Ray Datasets.
Description
TableRead.to_ray() creates a RayDatasource that distributes splits across Ray workers. It calls ray.data.read_datasource() with the RayDatasource adapter. The RayDatasource.get_read_tasks() method distributes splits into equal chunks for balanced parallelism. Each read task produces Arrow RecordBatches that form Ray data blocks. Supports configurable concurrency, Ray remote args, and block count overrides.
Usage
After performing scan planning with optional filters and projections, call to_ray() on the table reader with the resulting splits. The returned Ray Dataset can then be used for distributed transformations.
Code Reference
Source Location
paimon-python/pypaimon/read/table_read.py:L131-176paimon-python/pypaimon/read/datasource/ray_datasource.py:L43-227
Signature
class TableRead:
def to_ray(
self,
splits: List[Split],
*,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
**read_args,
) -> "ray.data.dataset.Dataset":
Import
from pypaimon.read.table_read import TableRead
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| splits | List[Split] | Yes | From TableScan.plan().splits() |
| ray_remote_args | Optional[Dict[str, Any]] | No | Ray task kwargs (e.g., {"num_cpus": 2}) |
| concurrency | Optional[int] | No | Max concurrent Ray tasks |
| override_num_blocks | Optional[int] | No | Output block count override |
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | ray.data.dataset.Dataset | Distributed Ray Dataset containing the table data |
Usage Examples
Basic Usage
# After scan planning with filters
read_builder = table.new_read_builder()
scan = read_builder.new_scan()
plan = scan.plan()
splits = plan.splits()
# Convert to Ray Dataset
reader = read_builder.new_read()
ray_dataset = reader.to_ray(
splits,
ray_remote_args={"num_cpus": 2},
concurrency=4,
)
print(f"Row count: {ray_dataset.count()}")