Implementation:ArroyoSystems Arroyo Filesystem Sink
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Provides the core V1 filesystem sink infrastructure for writing streaming data to object stores (S3, GCS, etc.) using multipart uploads, with support for Parquet and JSON formats, Delta Lake and Iceberg table formats, partitioning, rolling policies, and exactly-once checkpointing.
Description
This is the largest module in the filesystem connector area (~2000 lines) and defines the fundamental abstractions and runtime components for the V1 filesystem sink:
Core Abstractions:
- BatchBufferingWriter trait - Defines the interface for format-specific buffered writers that accumulate data, track buffer sizes, support splitting at part boundaries, produce trailing bytes for checkpoints, and close with final bytes and optional Iceberg metadata.
- BatchMultipartWriter<BBW> - Combines a BatchBufferingWriter with a MultipartManager to coordinate buffered writes with multipart upload operations. It manages stats, target part sizes, and handles initialization, part completion, checkpointing, and closing.
- MultipartManager - Manages the lifecycle of a single multipart upload: initialization, part uploading, checkpoint serialization, and completion tracking. Handles the case where the multipart ID is not yet available by queuing parts.
Runtime Components:
- FileSystemSink<BBW> - The main V1 operator struct implementing TwoPhaseCommitter. Uses message-passing (channels) to communicate with an async background writer task. Supports PerSubtask and PerOperator commit strategies.
- AsyncMultipartFileSystemWriter<BBW> - Background task that manages multiple BatchMultipartWriter instances (one per active file), processes data, checkpoint, and commit messages, and polls upload futures to completion.
- CommitState - Enum tracking the table format state: DeltaLake (with version tracking), Iceberg (with table wrapper), or VanillaParquet (no table format).
File Lifecycle Types:
- FileToFinish - Represents a file with completed multipart upload parts ready for finalization.
- FinishedFile - A file that has been successfully finalized on object storage.
- InProgressFileCheckpoint / FileCheckpointData - Serializable checkpoint state for recovery, covering all stages from not-yet-created multipart uploads through completed uploads.
- RollingPolicy - Determines when to close a file and start a new one, based on part limit, size limit, inactivity duration, rollover duration, or watermark-based time pattern expiration.
Type Aliases:
- ParquetFileSystemSink = FileSystemSink<ParquetBatchBufferingWriter>
- JsonFileSystemSink = FileSystemSink<JsonWriter>
- LocalParquetFileSystemSink = LocalFileSystemWriter<ParquetLocalWriter>
- LocalJsonFileSystemSink = LocalFileSystemWriter<JsonLocalWriter>
Usage
This is the primary V1 sink implementation used for all remote filesystem writes in Arroyo. It is selected by the filesystem connector's make_sink function based on the output format and storage path. For newer deployments, the V2 implementation may be preferred.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/sink/mod.rs
- Lines: 1-2001
Signature
pub trait BatchBufferingWriter: Send {
fn new(config: &config::FileSystemSink, format: Format, schema: ArroyoSchemaRef,
iceberg_schema: Option<iceberg::spec::SchemaRef>, event_logger: FsEventLogger) -> Self;
fn suffix() -> String;
fn add_batch_data(&mut self, data: &RecordBatch);
fn unflushed_bytes(&self) -> usize;
fn buffered_bytes(&self) -> usize;
fn split_to(&mut self, pos: usize) -> Bytes;
fn get_trailing_bytes_for_checkpoint(&mut self) -> (Vec<u8>, Option<IcebergFileMetadata>);
fn close(&mut self) -> (Bytes, Option<IcebergFileMetadata>);
}
pub struct FileSystemSink<BBW: BatchBufferingWriter> { ... }
pub type ParquetFileSystemSink = FileSystemSink<ParquetBatchBufferingWriter>;
pub type JsonFileSystemSink = FileSystemSink<JsonWriter>;
pub struct BatchMultipartWriter<BBW: BatchBufferingWriter> { ... }
#[derive(Debug)]
pub enum CommitState {
DeltaLake { last_version: i64, table: Box<DeltaTable> },
Iceberg(Box<IcebergTable>),
VanillaParquet,
}
pub struct FinishedFile {
filename: String,
partition: Option<String>,
size: usize,
metadata: Option<IcebergFileMetadata>,
}
pub struct FileSystemDataRecovery {
pub next_file_index: usize,
pub active_files: Vec<InProgressFileCheckpoint>,
pub delta_version: i64,
}
pub(crate) fn add_suffix_prefix(filename: String, prefix: Option<&String>, suffix: &String) -> String;
pub(crate) fn split_into_parts(bytes: Bytes, part_size: usize) -> Vec<Bytes>;
Import
use arroyo_connectors::filesystem::sink::{
FileSystemSink, ParquetFileSystemSink, JsonFileSystemSink,
BatchBufferingWriter, CommitState, FinishedFile, FileSystemDataRecovery,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | config::FileSystemSink | Yes | Sink configuration with path, storage options, rolling policy, multipart settings |
| format | Format | Yes | Output format (Parquet or JSON) |
| table_format | TableFormat | Yes | Table format (None, Delta, or Iceberg) |
| partitioner_mode | PartitionerMode | Yes | Partitioning strategy (None, Hive, or Iceberg) |
| RecordBatch | RecordBatch | Yes | Data batches received from the pipeline |
Outputs
| Name | Type | Description |
|---|---|---|
| Files | Object store files | Parquet or JSON files written to S3/GCS/etc. via multipart upload |
| FileSystemDataRecovery | FileSystemDataRecovery | Checkpoint recovery state for exactly-once guarantees |
| FileToFinish | FileToFinish | Pre-commit data for files with completed multipart parts |
Usage Examples
// Create a Parquet filesystem sink for S3
let operator = FileSystemSink::<ParquetBatchBufferingWriter>::create_and_start(
sink_config,
TableFormat::Delta,
Format::Parquet(ParquetFormat::default()),
PartitionerMode::None,
Some("connection-123".to_string()),
);