Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Paimon TableRead To Ray

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Distributed_Computing
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for converting Paimon table splits into distributed Ray Datasets.

Description

TableRead.to_ray() creates a RayDatasource that distributes splits across Ray workers. It calls ray.data.read_datasource() with the RayDatasource adapter. The RayDatasource.get_read_tasks() method distributes splits into equal chunks for balanced parallelism. Each read task produces Arrow RecordBatches that form Ray data blocks. Supports configurable concurrency, Ray remote args, and block count overrides.

Usage

After performing scan planning with optional filters and projections, call to_ray() on the table reader with the resulting splits. The returned Ray Dataset can then be used for distributed transformations.

Code Reference

Source Location

  • paimon-python/pypaimon/read/table_read.py:L131-176
  • paimon-python/pypaimon/read/datasource/ray_datasource.py:L43-227

Signature

class TableRead:
    def to_ray(
        self,
        splits: List[Split],
        *,
        ray_remote_args: Optional[Dict[str, Any]] = None,
        concurrency: Optional[int] = None,
        override_num_blocks: Optional[int] = None,
        **read_args,
    ) -> "ray.data.dataset.Dataset":

Import

from pypaimon.read.table_read import TableRead

I/O Contract

Inputs

Name Type Required Description
splits List[Split] Yes From TableScan.plan().splits()
ray_remote_args Optional[Dict[str, Any]] No Ray task kwargs (e.g., {"num_cpus": 2})
concurrency Optional[int] No Max concurrent Ray tasks
override_num_blocks Optional[int] No Output block count override

Outputs

Name Type Description
(return) ray.data.dataset.Dataset Distributed Ray Dataset containing the table data

Usage Examples

Basic Usage

# After scan planning with filters
read_builder = table.new_read_builder()
scan = read_builder.new_scan()
plan = scan.plan()
splits = plan.splits()

# Convert to Ray Dataset
reader = read_builder.new_read()
ray_dataset = reader.to_ray(
    splits,
    ray_remote_args={"num_cpus": 2},
    concurrency=4,
)
print(f"Row count: {ray_dataset.count()}")

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment