Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Filesystem Source

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Implements a filesystem source operator that reads files from object storage (S3, GCS, local) with support for JSON and Parquet formats, regex filtering, compression (Gzip, Zstd), checkpointed progress tracking, and parallel file distribution across tasks.

Description

The FileSystemSourceFunc struct implements the SourceOperator trait to read files from a configured storage path. It supports the following capabilities:

File Discovery and Distribution:

  • Lists all files at the configured path using the StorageProvider.
  • Optionally filters files using a regex pattern.
  • Distributes files across parallel tasks using consistent hashing (DefaultHasher on the file path modulo parallelism).

Format Support:

  • JSON - Reads newline-delimited JSON via get_newline_separated_stream. Supports optional compression (Gzip via GzipDecoder, Zstd via ZstdDecoder). Lines are deserialized through the collector's deserializer.
  • Parquet - Reads Parquet files via get_record_batch_stream using ParquetRecordBatchStreamBuilder with ParquetObjectReader. Adds a timestamp column with the current system time to each batch. Batch size is fixed at 8192 rows.

Checkpointing and Recovery:

  • File progress is tracked via FileReadState: either RecordsRead(usize) (number of records/batches read) or Finished.
  • State is persisted to a global keyed state table "a" during checkpoints.
  • On recovery, already-finished files are skipped and partially-read files resume from the last checkpoint position using skip.

Control Flow:

  • The source responds to checkpoint, stop (graceful or immediate), and commit control messages.
  • Reading interleaves file data processing with control message handling via tokio::select!.

Usage

Used when a streaming pipeline needs to ingest data from files in object storage or a local directory. Suitable for batch-like ingestion of historical data into a streaming pipeline.

Code Reference

Source Location

Signature

pub struct FileSystemSourceFunc {
    pub source: config::FileSystemSource,
    pub format: Format,
    pub framing: Option<Framing>,
    pub bad_data: Option<BadData>,
    pub file_states: HashMap<String, FileReadState>,
}

#[derive(Encode, Decode, Debug, Clone, PartialEq, PartialOrd)]
pub enum FileReadState {
    Finished,
    RecordsRead(usize),
}

#[async_trait]
impl SourceOperator for FileSystemSourceFunc {
    fn tables(&self) -> HashMap<String, TableConfig>;
    fn name(&self) -> String;           // "FileSystem"
    async fn run(
        &mut self,
        ctx: &mut SourceContext,
        collector: &mut SourceCollector,
    ) -> Result<SourceFinishType, DataflowError>;
}

Import

use arroyo_connectors::filesystem::source::{FileSystemSourceFunc, FileReadState};

I/O Contract

Inputs

Name Type Required Description
source config::FileSystemSource Yes Path, storage options, regex pattern, compression format
format Format Yes JSON or Parquet format configuration
framing Option<Framing> No Optional framing for deserialization
bad_data Option<BadData> No Bad data handling policy

Outputs

Name Type Description
RecordBatch RecordBatch For Parquet: batches with appended timestamp column
Deserialized records Via SourceCollector For JSON: deserialized records from newline-delimited lines
FileReadState Checkpointed state Per-file progress (records read or finished) for recovery

Usage Examples

// Create a filesystem source for reading JSON from S3
let source = FileSystemSourceFunc {
    source: config::FileSystemSource {
        path: "s3://my-bucket/data/".to_string(),
        storage_options: HashMap::new(),
        regex_pattern: Some(".*\\.json$".to_string()),
        compression_format: SourceFileCompressionFormat::Gzip,
    },
    format: Format::Json(JsonFormat::default()),
    framing: None,
    bad_data: None,
    file_states: HashMap::new(),
};

// The source is run by the Arroyo runtime via the SourceOperator trait
let finish_type = source.run(&mut ctx, &mut collector).await?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment