Implementation:ArroyoSystems Arroyo Local File Sink
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements a local filesystem sink that writes streaming data to temporary files and atomically moves them to a final directory on commit, with support for partitioning, rolling policies, and checkpointing.
Description
The LocalFileSystemWriter<V> struct is a generic writer parameterized by a LocalWriter implementation (e.g., ParquetLocalWriter or JsonLocalWriter). It uses a two-phase commit protocol:
- Write phase - Data is written to temporary files under an __in_progress directory. Multiple writers can be active simultaneously, one per partition key.
- Commit phase - On commit, temporary files are atomically renamed (via
tokio::fs::rename) to their final destination directory.
The LocalWriter trait defines the contract for format-specific local file writers with methods for write_batch, sync, close, checkpoint, and stats. Writers are created lazily when data arrives for a partition and are named using configurable strategies (Serial, ULID, UUID, UUIDv7) with optional prefix and suffix.
Rolling policies determine when to close and start a new file based on part limit, size limit, inactivity duration, or rollover duration.
The TwoPhaseCommitter implementation handles initialization (recovering in-progress files from a previous checkpoint by truncating and appending trailing bytes), batch insertion with partition routing, checkpoint creation with file recovery data, and commit with Delta Lake table format integration.
Usage
Used when the filesystem sink target is a local (or file://) path. Suitable for development, testing, and single-node deployments. Does not support Iceberg table format.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/sink/local.rs
- Lines: 1-428
Signature
pub struct LocalFileSystemWriter<V: LocalWriter> {
writers: HashMap<Option<OwnedRow>, V>,
tmp_dir: String,
final_dir: String,
// ... additional fields
}
impl<V: LocalWriter> LocalFileSystemWriter<V> {
pub fn new(
table_properties: config::FileSystemSink,
table_format: TableFormat,
format: Format,
partitioner_mode: PartitionerMode,
) -> TwoPhaseCommitterOperator<Self>;
}
pub trait LocalWriter: Send + 'static {
fn new(tmp_path: String, final_path: String, ...) -> Self;
fn file_suffix() -> &'static str;
fn write_batch(&mut self, batch: &RecordBatch) -> anyhow::Result<usize>;
fn sync(&mut self) -> anyhow::Result<usize>;
fn close(&mut self) -> anyhow::Result<FilePreCommit>;
fn checkpoint(&mut self) -> anyhow::Result<Option<CurrentFileRecovery>>;
fn stats(&self) -> MultiPartWriterStats;
}
#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub struct FilePreCommit {
pub tmp_file: String,
pub destination: String,
}
#[derive(Debug, Clone, Decode, Encode, PartialEq)]
pub struct CurrentFileRecovery {
pub tmp_file: String,
pub bytes_written: usize,
pub suffix: Option<Vec<u8>>,
pub destination: String,
pub metadata: Option<IcebergFileMetadata>,
}
Import
use arroyo_connectors::filesystem::sink::local::{
LocalFileSystemWriter, LocalWriter, FilePreCommit, CurrentFileRecovery,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table_properties | config::FileSystemSink | Yes | Sink configuration including path, rolling policy, file naming, multipart settings |
| table_format | TableFormat | Yes | Table format (Delta or None; Iceberg is not supported for local) |
| format | Format | Yes | Output format (Parquet or JSON) |
| batch | RecordBatch | Yes | Data batches to write (received via insert_batch) |
Outputs
| Name | Type | Description |
|---|---|---|
| FilePreCommit | FilePreCommit | Paths for temporary and destination files ready to be committed |
| LocalFileDataRecovery | LocalFileDataRecovery | Checkpoint recovery data including next file index and in-progress files |
Usage Examples
// Create a local Parquet filesystem sink
let operator = LocalFileSystemWriter::<ParquetLocalWriter>::new(
sink_config,
TableFormat::None,
Format::Parquet(ParquetFormat::default()),
PartitionerMode::None,
);