Implementation:ArroyoSystems Arroyo Filesystem Source
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements a filesystem source operator that reads files from object storage (S3, GCS, local) with support for JSON and Parquet formats, regex filtering, compression (Gzip, Zstd), checkpointed progress tracking, and parallel file distribution across tasks.
Description
The FileSystemSourceFunc struct implements the SourceOperator trait to read files from a configured storage path. It supports the following capabilities:
File Discovery and Distribution:
- Lists all files at the configured path using the StorageProvider.
- Optionally filters files using a regex pattern.
- Distributes files across parallel tasks using consistent hashing (DefaultHasher on the file path modulo parallelism).
Format Support:
- JSON - Reads newline-delimited JSON via get_newline_separated_stream. Supports optional compression (Gzip via GzipDecoder, Zstd via ZstdDecoder). Lines are deserialized through the collector's deserializer.
- Parquet - Reads Parquet files via get_record_batch_stream using ParquetRecordBatchStreamBuilder with ParquetObjectReader. Adds a timestamp column with the current system time to each batch. Batch size is fixed at 8192 rows.
Checkpointing and Recovery:
- File progress is tracked via FileReadState: either RecordsRead(usize) (number of records/batches read) or Finished.
- State is persisted to a global keyed state table "a" during checkpoints.
- On recovery, already-finished files are skipped and partially-read files resume from the last checkpoint position using skip.
Control Flow:
- The source responds to checkpoint, stop (graceful or immediate), and commit control messages.
- Reading interleaves file data processing with control message handling via tokio::select!.
Usage
Used when a streaming pipeline needs to ingest data from files in object storage or a local directory. Suitable for batch-like ingestion of historical data into a streaming pipeline.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/source.rs
- Lines: 1-411
Signature
pub struct FileSystemSourceFunc {
pub source: config::FileSystemSource,
pub format: Format,
pub framing: Option<Framing>,
pub bad_data: Option<BadData>,
pub file_states: HashMap<String, FileReadState>,
}
#[derive(Encode, Decode, Debug, Clone, PartialEq, PartialOrd)]
pub enum FileReadState {
Finished,
RecordsRead(usize),
}
#[async_trait]
impl SourceOperator for FileSystemSourceFunc {
fn tables(&self) -> HashMap<String, TableConfig>;
fn name(&self) -> String; // "FileSystem"
async fn run(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> Result<SourceFinishType, DataflowError>;
}
Import
use arroyo_connectors::filesystem::source::{FileSystemSourceFunc, FileReadState};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| source | config::FileSystemSource | Yes | Path, storage options, regex pattern, compression format |
| format | Format | Yes | JSON or Parquet format configuration |
| framing | Option<Framing> | No | Optional framing for deserialization |
| bad_data | Option<BadData> | No | Bad data handling policy |
Outputs
| Name | Type | Description |
|---|---|---|
| RecordBatch | RecordBatch | For Parquet: batches with appended timestamp column |
| Deserialized records | Via SourceCollector | For JSON: deserialized records from newline-delimited lines |
| FileReadState | Checkpointed state | Per-file progress (records read or finished) for recovery |
Usage Examples
// Create a filesystem source for reading JSON from S3
let source = FileSystemSourceFunc {
source: config::FileSystemSource {
path: "s3://my-bucket/data/".to_string(),
storage_options: HashMap::new(),
regex_pattern: Some(".*\\.json$".to_string()),
compression_format: SourceFileCompressionFormat::Gzip,
},
format: Format::Json(JsonFormat::default()),
framing: None,
bad_data: None,
file_states: HashMap::new(),
};
// The source is run by the Arroyo runtime via the SourceOperator trait
let finish_type = source.run(&mut ctx, &mut collector).await?;