Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Parquet Writer

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Implements Parquet-specific writers for both multipart (remote object store) and local filesystem sinks, handling row group management, Iceberg schema integration, shared buffers, compression, and checkpoint trailing bytes.

Description

This module provides two Parquet writer implementations:

ParquetBatchBufferingWriter implements the BatchBufferingWriter trait for remote multipart uploads. It writes Arrow record batches into a SharedBuffer (a thread-safe, interior-mutable byte buffer) via the ArrowWriter from the parquet crate. Key behaviors:

  • Configures compression (Uncompressed, Snappy, Gzip, Zstd, LZ4, LZ4_RAW) and row group size from ParquetFormat settings, defaulting to 128 MB row groups.
  • Removes the Arroyo timestamp column before writing.
  • For Iceberg tables, remaps Arrow field IDs to match the catalog-assigned Iceberg schema via update_field_ids_to_iceberg and normalizes nested batch metadata via normalize_batch_to_schema before each write.
  • Flushes row groups when the in-progress size exceeds the configured threshold.
  • Supports split_to for extracting completed data to send as multipart upload parts.
  • Produces trailing bytes for checkpointing (the remaining buffer contents plus the Parquet footer that would close the file) and optional IcebergFileMetadata extracted from the file metadata.

ParquetLocalWriter implements the LocalWriter trait for local filesystem writes. It writes through the same SharedBuffer mechanism but periodically syncs buffered data to a local file. Checkpointing records the file position and trailing bytes needed to produce a valid Parquet file on recovery.

The SharedBuffer struct provides a cloneable, interior-mutable buffer (Arc<Mutex<BytesMut::Writer>>) that satisfies the Write trait and allows the ArrowWriter and the multipart upload system to share the same underlying byte buffer.

The representitive_timestamp helper extracts the first timestamp value from a nanosecond timestamp column, used for rolling policies based on watermark expiration patterns.

Usage

These writers are instantiated by the filesystem sink when the output format is Parquet. ParquetBatchBufferingWriter is used for S3/GCS writes, while ParquetLocalWriter is used for file:// targets.

Code Reference

Source Location

Signature

pub struct ParquetBatchBufferingWriter {
    writer: Option<ArrowWriter<SharedBuffer>>,
    buffer: SharedBuffer,
    row_group_size_bytes: usize,
    schema: ArroyoSchemaRef,
    writer_schema: SchemaRef,
    iceberg_schema: Option<iceberg::spec::SchemaRef>,
    event_logger: FsEventLogger,
}

impl BatchBufferingWriter for ParquetBatchBufferingWriter {
    fn new(config: &config::FileSystemSink, format: Format, schema: ArroyoSchemaRef,
           iceberg_schema: Option<iceberg::spec::SchemaRef>, event_logger: FsEventLogger) -> Self;
    fn suffix() -> String;         // "parquet"
    fn add_batch_data(&mut self, data: &RecordBatch);
    fn unflushed_bytes(&self) -> usize;
    fn buffered_bytes(&self) -> usize;
    fn split_to(&mut self, pos: usize) -> Bytes;
    fn get_trailing_bytes_for_checkpoint(&mut self) -> (Vec<u8>, Option<IcebergFileMetadata>);
    fn close(&mut self) -> (Bytes, Option<IcebergFileMetadata>);
}

pub struct ParquetLocalWriter { ... }

impl LocalWriter for ParquetLocalWriter {
    fn new(tmp_path: String, final_path: String, ...) -> Self;
    fn file_suffix() -> &'static str;  // "parquet"
    fn write_batch(&mut self, batch: &RecordBatch) -> anyhow::Result<usize>;
    fn sync(&mut self) -> anyhow::Result<usize>;
    fn close(&mut self) -> anyhow::Result<FilePreCommit>;
    fn checkpoint(&mut self) -> anyhow::Result<Option<CurrentFileRecovery>>;
    fn stats(&self) -> MultiPartWriterStats;
}

pub(crate) fn representitive_timestamp(timestamp_column: &Arc<dyn Array>) -> Result<SystemTime>;

Import

use arroyo_connectors::filesystem::sink::parquet::{
    ParquetBatchBufferingWriter, ParquetLocalWriter, representitive_timestamp,
};

I/O Contract

Inputs

Name Type Required Description
config config::FileSystemSink Yes Sink settings (unused by Parquet writer directly)
format Format::Parquet(ParquetFormat) Yes Compression type and row group byte size
schema ArroyoSchemaRef Yes Arrow schema with timestamp column index
iceberg_schema Option<iceberg::spec::SchemaRef> No Iceberg schema for field ID remapping
batch RecordBatch Yes Arrow record batch (timestamp column removed before write)

Outputs

Name Type Description
Bytes Bytes Buffered Parquet data ready for multipart upload
IcebergFileMetadata Option<IcebergFileMetadata> File-level statistics for Iceberg manifests (when Iceberg schema is present)
FilePreCommit FilePreCommit Tmp and final paths for local file commit

Usage Examples

// The ParquetBatchBufferingWriter is created internally by the filesystem sink:
let writer = ParquetBatchBufferingWriter::new(
    &config,
    Format::Parquet(ParquetFormat { compression: ParquetCompression::Zstd, .. }),
    schema,
    Some(iceberg_schema),
    event_logger,
);

// Write a batch (timestamp column is removed automatically)
writer.add_batch_data(&record_batch);

// Check if buffer is large enough for a multipart part
if writer.buffered_bytes() >= target_part_size {
    let part = writer.split_to(target_part_size);
    // upload part...
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment