Implementation:ArroyoSystems Arroyo Parquet Writer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements Parquet-specific writers for both multipart (remote object store) and local filesystem sinks, handling row group management, Iceberg schema integration, shared buffers, compression, and checkpoint trailing bytes.
Description
This module provides two Parquet writer implementations:
ParquetBatchBufferingWriter implements the BatchBufferingWriter trait for remote multipart uploads. It writes Arrow record batches into a SharedBuffer (a thread-safe, interior-mutable byte buffer) via the ArrowWriter from the parquet crate. Key behaviors:
- Configures compression (Uncompressed, Snappy, Gzip, Zstd, LZ4, LZ4_RAW) and row group size from ParquetFormat settings, defaulting to 128 MB row groups.
- Removes the Arroyo timestamp column before writing.
- For Iceberg tables, remaps Arrow field IDs to match the catalog-assigned Iceberg schema via update_field_ids_to_iceberg and normalizes nested batch metadata via normalize_batch_to_schema before each write.
- Flushes row groups when the in-progress size exceeds the configured threshold.
- Supports split_to for extracting completed data to send as multipart upload parts.
- Produces trailing bytes for checkpointing (the remaining buffer contents plus the Parquet footer that would close the file) and optional IcebergFileMetadata extracted from the file metadata.
ParquetLocalWriter implements the LocalWriter trait for local filesystem writes. It writes through the same SharedBuffer mechanism but periodically syncs buffered data to a local file. Checkpointing records the file position and trailing bytes needed to produce a valid Parquet file on recovery.
The SharedBuffer struct provides a cloneable, interior-mutable buffer (Arc<Mutex<BytesMut::Writer>>) that satisfies the Write trait and allows the ArrowWriter and the multipart upload system to share the same underlying byte buffer.
The representitive_timestamp helper extracts the first timestamp value from a nanosecond timestamp column, used for rolling policies based on watermark expiration patterns.
Usage
These writers are instantiated by the filesystem sink when the output format is Parquet. ParquetBatchBufferingWriter is used for S3/GCS writes, while ParquetLocalWriter is used for file:// targets.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/sink/parquet.rs
- Lines: 1-368
Signature
pub struct ParquetBatchBufferingWriter {
writer: Option<ArrowWriter<SharedBuffer>>,
buffer: SharedBuffer,
row_group_size_bytes: usize,
schema: ArroyoSchemaRef,
writer_schema: SchemaRef,
iceberg_schema: Option<iceberg::spec::SchemaRef>,
event_logger: FsEventLogger,
}
impl BatchBufferingWriter for ParquetBatchBufferingWriter {
fn new(config: &config::FileSystemSink, format: Format, schema: ArroyoSchemaRef,
iceberg_schema: Option<iceberg::spec::SchemaRef>, event_logger: FsEventLogger) -> Self;
fn suffix() -> String; // "parquet"
fn add_batch_data(&mut self, data: &RecordBatch);
fn unflushed_bytes(&self) -> usize;
fn buffered_bytes(&self) -> usize;
fn split_to(&mut self, pos: usize) -> Bytes;
fn get_trailing_bytes_for_checkpoint(&mut self) -> (Vec<u8>, Option<IcebergFileMetadata>);
fn close(&mut self) -> (Bytes, Option<IcebergFileMetadata>);
}
pub struct ParquetLocalWriter { ... }
impl LocalWriter for ParquetLocalWriter {
fn new(tmp_path: String, final_path: String, ...) -> Self;
fn file_suffix() -> &'static str; // "parquet"
fn write_batch(&mut self, batch: &RecordBatch) -> anyhow::Result<usize>;
fn sync(&mut self) -> anyhow::Result<usize>;
fn close(&mut self) -> anyhow::Result<FilePreCommit>;
fn checkpoint(&mut self) -> anyhow::Result<Option<CurrentFileRecovery>>;
fn stats(&self) -> MultiPartWriterStats;
}
pub(crate) fn representitive_timestamp(timestamp_column: &Arc<dyn Array>) -> Result<SystemTime>;
Import
use arroyo_connectors::filesystem::sink::parquet::{
ParquetBatchBufferingWriter, ParquetLocalWriter, representitive_timestamp,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | config::FileSystemSink | Yes | Sink settings (unused by Parquet writer directly) |
| format | Format::Parquet(ParquetFormat) | Yes | Compression type and row group byte size |
| schema | ArroyoSchemaRef | Yes | Arrow schema with timestamp column index |
| iceberg_schema | Option<iceberg::spec::SchemaRef> | No | Iceberg schema for field ID remapping |
| batch | RecordBatch | Yes | Arrow record batch (timestamp column removed before write) |
Outputs
| Name | Type | Description |
|---|---|---|
| Bytes | Bytes | Buffered Parquet data ready for multipart upload |
| IcebergFileMetadata | Option<IcebergFileMetadata> | File-level statistics for Iceberg manifests (when Iceberg schema is present) |
| FilePreCommit | FilePreCommit | Tmp and final paths for local file commit |
Usage Examples
// The ParquetBatchBufferingWriter is created internally by the filesystem sink:
let writer = ParquetBatchBufferingWriter::new(
&config,
Format::Parquet(ParquetFormat { compression: ParquetCompression::Zstd, .. }),
schema,
Some(iceberg_schema),
event_logger,
);
// Write a batch (timestamp column is removed automatically)
writer.add_batch_data(&record_batch);
// Check if buffer is large enough for a multipart part
if writer.buffered_bytes() >= target_part_size {
let part = writer.split_to(target_part_size);
// upload part...
}