Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Paimon TableRead To Ray Lance

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Columnar_Storage
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for converting Lance-format Paimon table splits into distributed Ray Datasets for analytics.

Description

TableRead.to_ray() creates a RayDatasource for distributed reading of Lance-format splits. Each Ray read task processes its assigned splits using FormatLanceReader internally. The override_num_blocks parameter controls parallelism. The resulting Ray Dataset supports groupby().sum(), groupby().count(), and other aggregation operations for distributed analytics.

The implementation wraps Paimon's read pipeline in Ray's Datasource interface, which provides:

  • Automatic split distribution: Ray assigns splits to workers based on available resources
  • Fault tolerance: Failed read tasks are automatically retried on other workers
  • Backpressure: Ray manages memory by controlling how many blocks are read ahead
  • Schema propagation: The table schema is passed to Ray for metadata-aware processing

Usage

Use this implementation when distributed analytics on Lance-format Paimon tables is needed. The Ray Dataset produced by this method supports the full range of Ray Data operations.

Code Reference

Source Location

  • Repository: Apache Paimon
  • File: paimon-python/pypaimon/read/table_read.py:L131-176
  • File: paimon-python/pypaimon/read/datasource/ray_datasource.py:L43-227

Signature

class TableRead:
    def to_ray(
        self,
        splits: List[Split],
        *,
        override_num_blocks: Optional[int] = None,
        ray_remote_args: Optional[Dict[str, Any]] = None,
        concurrency: Optional[int] = None,
        **read_args,
    ) -> "ray.data.dataset.Dataset":

Import

from pypaimon.read.table_read import TableRead

I/O Contract

Inputs

Name Type Required Description
splits List[Split] Yes Lance-format splits from scan planning via scan.plan().splits()
override_num_blocks Optional[int] No Controls the number of output blocks and read parallelism for Ray
ray_remote_args Optional[Dict[str, Any]] No Additional arguments passed to Ray remote functions (e.g., num_cpus, resources)
concurrency Optional[int] No Number of concurrent read tasks for controlling read throughput

Outputs

Name Type Description
dataset ray.data.dataset.Dataset Ray Dataset for distributed operations including groupby, aggregation, map_batches, and more

Usage Examples

Basic Usage

read_builder = table.new_read_builder()
read_builder = read_builder.with_projection(['category', 'value'])

scan = read_builder.new_scan()
splits = scan.plan().splits()
reader = read_builder.new_read()

ray_ds = reader.to_ray(splits, override_num_blocks=8)
result = ray_ds.groupby('category').sum('value')
print(result.to_pandas())

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment