Implementation:Apache Paimon TableRead To Ray Lance
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Columnar_Storage |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for converting Lance-format Paimon table splits into distributed Ray Datasets for analytics.
Description
TableRead.to_ray() creates a RayDatasource for distributed reading of Lance-format splits. Each Ray read task processes its assigned splits using FormatLanceReader internally. The override_num_blocks parameter controls parallelism. The resulting Ray Dataset supports groupby().sum(), groupby().count(), and other aggregation operations for distributed analytics.
The implementation wraps Paimon's read pipeline in Ray's Datasource interface, which provides:
- Automatic split distribution: Ray assigns splits to workers based on available resources
- Fault tolerance: Failed read tasks are automatically retried on other workers
- Backpressure: Ray manages memory by controlling how many blocks are read ahead
- Schema propagation: The table schema is passed to Ray for metadata-aware processing
Usage
Use this implementation when distributed analytics on Lance-format Paimon tables is needed. The Ray Dataset produced by this method supports the full range of Ray Data operations.
Code Reference
Source Location
- Repository: Apache Paimon
- File: paimon-python/pypaimon/read/table_read.py:L131-176
- File: paimon-python/pypaimon/read/datasource/ray_datasource.py:L43-227
Signature
class TableRead:
def to_ray(
self,
splits: List[Split],
*,
override_num_blocks: Optional[int] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
**read_args,
) -> "ray.data.dataset.Dataset":
Import
from pypaimon.read.table_read import TableRead
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| splits | List[Split] | Yes | Lance-format splits from scan planning via scan.plan().splits() |
| override_num_blocks | Optional[int] | No | Controls the number of output blocks and read parallelism for Ray |
| ray_remote_args | Optional[Dict[str, Any]] | No | Additional arguments passed to Ray remote functions (e.g., num_cpus, resources) |
| concurrency | Optional[int] | No | Number of concurrent read tasks for controlling read throughput |
Outputs
| Name | Type | Description |
|---|---|---|
| dataset | ray.data.dataset.Dataset | Ray Dataset for distributed operations including groupby, aggregation, map_batches, and more |
Usage Examples
Basic Usage
read_builder = table.new_read_builder()
read_builder = read_builder.with_projection(['category', 'value'])
scan = read_builder.new_scan()
splits = scan.plan().splits()
reader = read_builder.new_read()
ray_ds = reader.to_ray(splits, override_num_blocks=8)
result = ray_ds.groupby('category').sum('value')
print(result.to_pandas())