Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Filesystem Sink

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Provides the core V1 filesystem sink infrastructure for writing streaming data to object stores (S3, GCS, etc.) using multipart uploads, with support for Parquet and JSON formats, Delta Lake and Iceberg table formats, partitioning, rolling policies, and exactly-once checkpointing.

Description

This is the largest module in the filesystem connector area (~2000 lines) and defines the fundamental abstractions and runtime components for the V1 filesystem sink:

Core Abstractions:

  • BatchBufferingWriter trait - Defines the interface for format-specific buffered writers that accumulate data, track buffer sizes, support splitting at part boundaries, produce trailing bytes for checkpoints, and close with final bytes and optional Iceberg metadata.
  • BatchMultipartWriter<BBW> - Combines a BatchBufferingWriter with a MultipartManager to coordinate buffered writes with multipart upload operations. It manages stats, target part sizes, and handles initialization, part completion, checkpointing, and closing.
  • MultipartManager - Manages the lifecycle of a single multipart upload: initialization, part uploading, checkpoint serialization, and completion tracking. Handles the case where the multipart ID is not yet available by queuing parts.

Runtime Components:

  • FileSystemSink<BBW> - The main V1 operator struct implementing TwoPhaseCommitter. Uses message-passing (channels) to communicate with an async background writer task. Supports PerSubtask and PerOperator commit strategies.
  • AsyncMultipartFileSystemWriter<BBW> - Background task that manages multiple BatchMultipartWriter instances (one per active file), processes data, checkpoint, and commit messages, and polls upload futures to completion.
  • CommitState - Enum tracking the table format state: DeltaLake (with version tracking), Iceberg (with table wrapper), or VanillaParquet (no table format).

File Lifecycle Types:

  • FileToFinish - Represents a file with completed multipart upload parts ready for finalization.
  • FinishedFile - A file that has been successfully finalized on object storage.
  • InProgressFileCheckpoint / FileCheckpointData - Serializable checkpoint state for recovery, covering all stages from not-yet-created multipart uploads through completed uploads.
  • RollingPolicy - Determines when to close a file and start a new one, based on part limit, size limit, inactivity duration, rollover duration, or watermark-based time pattern expiration.

Type Aliases:

  • ParquetFileSystemSink = FileSystemSink<ParquetBatchBufferingWriter>
  • JsonFileSystemSink = FileSystemSink<JsonWriter>
  • LocalParquetFileSystemSink = LocalFileSystemWriter<ParquetLocalWriter>
  • LocalJsonFileSystemSink = LocalFileSystemWriter<JsonLocalWriter>

Usage

This is the primary V1 sink implementation used for all remote filesystem writes in Arroyo. It is selected by the filesystem connector's make_sink function based on the output format and storage path. For newer deployments, the V2 implementation may be preferred.

Code Reference

Source Location

Signature

pub trait BatchBufferingWriter: Send {
    fn new(config: &config::FileSystemSink, format: Format, schema: ArroyoSchemaRef,
           iceberg_schema: Option<iceberg::spec::SchemaRef>, event_logger: FsEventLogger) -> Self;
    fn suffix() -> String;
    fn add_batch_data(&mut self, data: &RecordBatch);
    fn unflushed_bytes(&self) -> usize;
    fn buffered_bytes(&self) -> usize;
    fn split_to(&mut self, pos: usize) -> Bytes;
    fn get_trailing_bytes_for_checkpoint(&mut self) -> (Vec<u8>, Option<IcebergFileMetadata>);
    fn close(&mut self) -> (Bytes, Option<IcebergFileMetadata>);
}

pub struct FileSystemSink<BBW: BatchBufferingWriter> { ... }
pub type ParquetFileSystemSink = FileSystemSink<ParquetBatchBufferingWriter>;
pub type JsonFileSystemSink = FileSystemSink<JsonWriter>;

pub struct BatchMultipartWriter<BBW: BatchBufferingWriter> { ... }

#[derive(Debug)]
pub enum CommitState {
    DeltaLake { last_version: i64, table: Box<DeltaTable> },
    Iceberg(Box<IcebergTable>),
    VanillaParquet,
}

pub struct FinishedFile {
    filename: String,
    partition: Option<String>,
    size: usize,
    metadata: Option<IcebergFileMetadata>,
}

pub struct FileSystemDataRecovery {
    pub next_file_index: usize,
    pub active_files: Vec<InProgressFileCheckpoint>,
    pub delta_version: i64,
}

pub(crate) fn add_suffix_prefix(filename: String, prefix: Option<&String>, suffix: &String) -> String;
pub(crate) fn split_into_parts(bytes: Bytes, part_size: usize) -> Vec<Bytes>;

Import

use arroyo_connectors::filesystem::sink::{
    FileSystemSink, ParquetFileSystemSink, JsonFileSystemSink,
    BatchBufferingWriter, CommitState, FinishedFile, FileSystemDataRecovery,
};

I/O Contract

Inputs

Name Type Required Description
config config::FileSystemSink Yes Sink configuration with path, storage options, rolling policy, multipart settings
format Format Yes Output format (Parquet or JSON)
table_format TableFormat Yes Table format (None, Delta, or Iceberg)
partitioner_mode PartitionerMode Yes Partitioning strategy (None, Hive, or Iceberg)
RecordBatch RecordBatch Yes Data batches received from the pipeline

Outputs

Name Type Description
Files Object store files Parquet or JSON files written to S3/GCS/etc. via multipart upload
FileSystemDataRecovery FileSystemDataRecovery Checkpoint recovery state for exactly-once guarantees
FileToFinish FileToFinish Pre-commit data for files with completed multipart parts

Usage Examples

// Create a Parquet filesystem sink for S3
let operator = FileSystemSink::<ParquetBatchBufferingWriter>::create_and_start(
    sink_config,
    TableFormat::Delta,
    Format::Parquet(ParquetFormat::default()),
    PartitionerMode::None,
    Some("connection-123".to_string()),
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment