Implementation:Apache Paimon BatchTableWrite Write Ray
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Ingestion |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for writing distributed Ray Datasets to Paimon tables via PaimonDatasink.
Description
TableWrite.write_ray() creates a PaimonDatasink and calls dataset.write_datasink() to distribute write tasks across Ray workers. The PaimonDatasink handles distributed write coordination:
- Each Ray write task creates a local FileStoreWrite instance.
- The write task receives assigned data blocks and writes them to the file store.
- Each task returns CommitMessage objects describing the files written.
- The on_write_complete() callback on the driver collects all commit messages.
- A single atomic commit is performed via FileStoreCommit to make all data visible.
The method supports:
- Overwrite mode: When overwrite=True, existing data in the table (or matching partitions) is replaced by the new data.
- Configurable concurrency: The concurrency parameter controls how many parallel write tasks are dispatched.
- Ray remote arguments: The ray_remote_args parameter allows passing resource requirements (e.g., CPU, memory) to Ray write tasks.
Usage
Use write_ray() as the final write step in a Ray-based Paimon ingestion pipeline, after data has been loaded and schema-aligned.
Code Reference
Source Location
- Repository: Apache Paimon
- Files:
- paimon-python/pypaimon/write/table_write.py:L74-99
- paimon-python/pypaimon/write/ray_datasink.py:L47-210
Signature
class TableWrite:
def write_ray(
self,
dataset: "Dataset",
overwrite: bool = False,
concurrency: Optional[int] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
) -> None:
Import
from pypaimon.write.table_write import BatchTableWrite
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| dataset | ray.data.Dataset | Yes | The Ray Dataset to write to the Paimon table. Must have a schema compatible with the target table (use schema alignment if needed). |
| overwrite | bool | No | When True, existing data in the table is replaced. Defaults to False (append mode). |
| concurrency | Optional[int] | No | Number of parallel write tasks to dispatch across Ray workers. Controls the degree of write parallelism. |
| ray_remote_args | Optional[Dict[str, Any]] | No | Additional keyword arguments passed to Ray remote tasks (e.g., {"num_cpus": 1, "memory": 1e9}). Controls resource allocation for write workers. |
Outputs
| Name | Type | Description |
|---|---|---|
| None | None | The method returns nothing. Data is written to the file store and committed atomically. On success, a new snapshot is created in the Paimon table containing all written data. |
Usage Examples
Basic Usage
# After schema alignment
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_ray(
aligned_dataset,
overwrite=False,
concurrency=4,
ray_remote_args={"num_cpus": 1},
)
Overwrite Mode
# Replace all existing data in the table
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_ray(
aligned_dataset,
overwrite=True,
concurrency=8,
)
Related Pages
Implements Principle
Requires Environment
Uses Heuristic
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment