Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon TableRead To Ray

From Leeroopedia
Revision as of 14:23, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Paimon_TableRead_To_Ray.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Lake, Distributed_Computing
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for converting Paimon table splits into distributed Ray Datasets.

Description

TableRead.to_ray() creates a RayDatasource that distributes splits across Ray workers. It calls ray.data.read_datasource() with the RayDatasource adapter. The RayDatasource.get_read_tasks() method distributes splits into equal chunks for balanced parallelism. Each read task produces Arrow RecordBatches that form Ray data blocks. Supports configurable concurrency, Ray remote args, and block count overrides.

Usage

After performing scan planning with optional filters and projections, call to_ray() on the table reader with the resulting splits. The returned Ray Dataset can then be used for distributed transformations.

Code Reference

Source Location

  • paimon-python/pypaimon/read/table_read.py:L131-176
  • paimon-python/pypaimon/read/datasource/ray_datasource.py:L43-227

Signature

class TableRead:
    def to_ray(
        self,
        splits: List[Split],
        *,
        ray_remote_args: Optional[Dict[str, Any]] = None,
        concurrency: Optional[int] = None,
        override_num_blocks: Optional[int] = None,
        **read_args,
    ) -> "ray.data.dataset.Dataset":

Import

from pypaimon.read.table_read import TableRead

I/O Contract

Inputs

Name Type Required Description
splits List[Split] Yes From TableScan.plan().splits()
ray_remote_args Optional[Dict[str, Any]] No Ray task kwargs (e.g., {"num_cpus": 2})
concurrency Optional[int] No Max concurrent Ray tasks
override_num_blocks Optional[int] No Output block count override

Outputs

Name Type Description
(return) ray.data.dataset.Dataset Distributed Ray Dataset containing the table data

Usage Examples

Basic Usage

# After scan planning with filters
read_builder = table.new_read_builder()
scan = read_builder.new_scan()
plan = scan.plan()
splits = plan.splits()

# Convert to Ray Dataset
reader = read_builder.new_read()
ray_dataset = reader.to_ray(
    splits,
    ray_remote_args={"num_cpus": 2},
    concurrency=4,
)
print(f"Row count: {ray_dataset.count()}")

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment