Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Workflow:Apache Paimon Data Ingestion With Ray Sink

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Ingestion, Distributed_Computing
Last Updated 2026-02-07 23:00 GMT

Overview

End-to-end process for ingesting external data sources (JSON, CSV, databases) into Paimon tables using Ray Data's distributed write pipeline with automatic schema casting and parallel commit.

Description

This workflow uses Ray Data as a distributed ingestion engine to write data into Paimon tables. External data is read by Ray from sources like JSON Lines files, CSV files, or databases, then transformed and cast to match the Paimon table schema. The Ray Sink integration distributes write tasks across workers, with each worker writing to its assigned partitions and buckets. The commit is handled automatically by the sink, ensuring atomic visibility of all written data.

Usage

Execute this workflow when you need to ingest large volumes of external data into Paimon tables with distributed parallelism. This is the recommended approach for ETL pipelines that load data from files, APIs, or databases into the data lake. It handles schema alignment, type casting, and distributed writes automatically.

Execution Steps

Step 1: Ray Cluster and Data Source Initialization

Initialize a Ray runtime and read the source data into a Ray Dataset. Ray Data supports reading from JSON Lines files, CSV files, Parquet files, databases, and custom data sources. Specify concurrency settings to control how many workers read source data in parallel.

Key considerations:

  • Ray Data lazily reads sources until operations are triggered
  • JSON Lines format is common for streaming data ingestion
  • Concurrency controls the number of parallel read tasks
  • Source schema may not match the target Paimon table schema

Step 2: Target Table Preparation

Create the Paimon catalog, database, and table with the desired schema. Define primary keys for upsert-capable tables or leave them empty for append-only tables. Set bucket count to control write parallelism and data distribution.

Key considerations:

  • Primary keys enable deduplication on write
  • Bucket count should align with expected write parallelism
  • Table options control file format, compaction, and merge behavior

Step 3: Schema Alignment and Type Casting

Obtain the target table's PyArrow schema and create a transformation function that casts the Ray Dataset's columns to match. This handles type mismatches between source data (often loosely typed JSON) and the strongly typed Paimon schema. The casting is applied as a map_batches operation on the Ray Dataset.

Key considerations:

  • JSON sources may produce int64 where the table expects int32
  • String-to-timestamp conversions need explicit format handling
  • Chunked arrays from Ray must be combined before casting
  • Schema alignment is critical to prevent write failures

Step 4: Distributed Write via Ray Sink

Create a batch write builder from the table and invoke the Ray write method. This distributes write tasks across Ray workers, with each worker receiving a subset of data blocks. Workers write data files to storage independently, tracking their outputs in commit messages.

Key considerations:

  • write_ray() handles both writing and committing automatically
  • Concurrency controls the number of parallel write tasks
  • Ray remote args specify resource requirements per task (CPUs, memory)
  • Overwrite mode can replace existing data or append

Step 5: Verification

Read the table back using the standard read pipeline to verify that all ingested data is present and correctly typed. Compare row counts between source and target, and spot-check column values and types.

Key considerations:

  • Row count should match the source data
  • Column types should match the Paimon schema
  • Primary key tables may have fewer rows if deduplication occurred
  • Verify partition structure if the table is partitioned

Execution Diagram

GitHub URL

Workflow Repository