Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Local File Sink

From Leeroopedia
Revision as of 14:27, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_Local_File_Sink.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Implements a local filesystem sink that writes streaming data to temporary files and atomically moves them to a final directory on commit, with support for partitioning, rolling policies, and checkpointing.

Description

The LocalFileSystemWriter<V> struct is a generic writer parameterized by a LocalWriter implementation (e.g., ParquetLocalWriter or JsonLocalWriter). It uses a two-phase commit protocol:

  1. Write phase - Data is written to temporary files under an __in_progress directory. Multiple writers can be active simultaneously, one per partition key.
  2. Commit phase - On commit, temporary files are atomically renamed (via tokio::fs::rename) to their final destination directory.

The LocalWriter trait defines the contract for format-specific local file writers with methods for write_batch, sync, close, checkpoint, and stats. Writers are created lazily when data arrives for a partition and are named using configurable strategies (Serial, ULID, UUID, UUIDv7) with optional prefix and suffix.

Rolling policies determine when to close and start a new file based on part limit, size limit, inactivity duration, or rollover duration.

The TwoPhaseCommitter implementation handles initialization (recovering in-progress files from a previous checkpoint by truncating and appending trailing bytes), batch insertion with partition routing, checkpoint creation with file recovery data, and commit with Delta Lake table format integration.

Usage

Used when the filesystem sink target is a local (or file://) path. Suitable for development, testing, and single-node deployments. Does not support Iceberg table format.

Code Reference

Source Location

Signature

pub struct LocalFileSystemWriter<V: LocalWriter> {
    writers: HashMap<Option<OwnedRow>, V>,
    tmp_dir: String,
    final_dir: String,
    // ... additional fields
}

impl<V: LocalWriter> LocalFileSystemWriter<V> {
    pub fn new(
        table_properties: config::FileSystemSink,
        table_format: TableFormat,
        format: Format,
        partitioner_mode: PartitionerMode,
    ) -> TwoPhaseCommitterOperator<Self>;
}

pub trait LocalWriter: Send + 'static {
    fn new(tmp_path: String, final_path: String, ...) -> Self;
    fn file_suffix() -> &'static str;
    fn write_batch(&mut self, batch: &RecordBatch) -> anyhow::Result<usize>;
    fn sync(&mut self) -> anyhow::Result<usize>;
    fn close(&mut self) -> anyhow::Result<FilePreCommit>;
    fn checkpoint(&mut self) -> anyhow::Result<Option<CurrentFileRecovery>>;
    fn stats(&self) -> MultiPartWriterStats;
}

#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub struct FilePreCommit {
    pub tmp_file: String,
    pub destination: String,
}

#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub struct CurrentFileRecovery {
    pub tmp_file: String,
    pub bytes_written: usize,
    pub suffix: Option<Vec<u8>>,
    pub destination: String,
    pub metadata: Option<IcebergFileMetadata>,
}

Import

use arroyo_connectors::filesystem::sink::local::{
    LocalFileSystemWriter, LocalWriter, FilePreCommit, CurrentFileRecovery,
};

I/O Contract

Inputs

Name Type Required Description
table_properties config::FileSystemSink Yes Sink configuration including path, rolling policy, file naming, multipart settings
table_format TableFormat Yes Table format (Delta or None; Iceberg is not supported for local)
format Format Yes Output format (Parquet or JSON)
batch RecordBatch Yes Data batches to write (received via insert_batch)

Outputs

Name Type Description
FilePreCommit FilePreCommit Paths for temporary and destination files ready to be committed
LocalFileDataRecovery LocalFileDataRecovery Checkpoint recovery data including next file index and in-progress files

Usage Examples

// Create a local Parquet filesystem sink
let operator = LocalFileSystemWriter::<ParquetLocalWriter>::new(
    sink_config,
    TableFormat::None,
    Format::Parquet(ParquetFormat::default()),
    PartitionerMode::None,
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment