Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Paimon BatchTableWrite Write Ray

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Ingestion
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for writing distributed Ray Datasets to Paimon tables via PaimonDatasink.

Description

TableWrite.write_ray() creates a PaimonDatasink and calls dataset.write_datasink() to distribute write tasks across Ray workers. The PaimonDatasink handles distributed write coordination:

  1. Each Ray write task creates a local FileStoreWrite instance.
  2. The write task receives assigned data blocks and writes them to the file store.
  3. Each task returns CommitMessage objects describing the files written.
  4. The on_write_complete() callback on the driver collects all commit messages.
  5. A single atomic commit is performed via FileStoreCommit to make all data visible.

The method supports:

  • Overwrite mode: When overwrite=True, existing data in the table (or matching partitions) is replaced by the new data.
  • Configurable concurrency: The concurrency parameter controls how many parallel write tasks are dispatched.
  • Ray remote arguments: The ray_remote_args parameter allows passing resource requirements (e.g., CPU, memory) to Ray write tasks.

Usage

Use write_ray() as the final write step in a Ray-based Paimon ingestion pipeline, after data has been loaded and schema-aligned.

Code Reference

Source Location

  • Repository: Apache Paimon
  • Files:
    • paimon-python/pypaimon/write/table_write.py:L74-99
    • paimon-python/pypaimon/write/ray_datasink.py:L47-210

Signature

class TableWrite:
    def write_ray(
        self,
        dataset: "Dataset",
        overwrite: bool = False,
        concurrency: Optional[int] = None,
        ray_remote_args: Optional[Dict[str, Any]] = None,
    ) -> None:

Import

from pypaimon.write.table_write import BatchTableWrite

I/O Contract

Inputs

Name Type Required Description
dataset ray.data.Dataset Yes The Ray Dataset to write to the Paimon table. Must have a schema compatible with the target table (use schema alignment if needed).
overwrite bool No When True, existing data in the table is replaced. Defaults to False (append mode).
concurrency Optional[int] No Number of parallel write tasks to dispatch across Ray workers. Controls the degree of write parallelism.
ray_remote_args Optional[Dict[str, Any]] No Additional keyword arguments passed to Ray remote tasks (e.g., {"num_cpus": 1, "memory": 1e9}). Controls resource allocation for write workers.

Outputs

Name Type Description
None None The method returns nothing. Data is written to the file store and committed atomically. On success, a new snapshot is created in the Paimon table containing all written data.

Usage Examples

Basic Usage

# After schema alignment
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()

writer.write_ray(
    aligned_dataset,
    overwrite=False,
    concurrency=4,
    ray_remote_args={"num_cpus": 1},
)

Overwrite Mode

# Replace all existing data in the table
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()

writer.write_ray(
    aligned_dataset,
    overwrite=True,
    concurrency=8,
)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment