Implementation:ArroyoSystems Arroyo Fluvio Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
FluvioConnector implements the Arroyo Connector trait for Fluvio, providing both source and sink capabilities for reading from and writing to Fluvio streams.
Description
The Fluvio connector supports Source and Sink table types. Sources can be configured with Earliest or Latest offset modes using the SourceOffset enum, which maps to Fluvio's Offset::beginning() and Offset::end() respectively. The connector requires a format and schema definition for both source and sink configurations. It constructs FluvioSourceFunc for sources (with topic, endpoint, offset_mode, format, framing, and bad_data) and FluvioSinkFunc for sinks (with topic, endpoint, producer, and ArrowSerializer). An optional endpoint parameter allows connecting to non-default Fluvio clusters. Table configuration is loaded from a JSON schema definition.
Usage
Use FluvioConnector when building Arroyo pipelines that need to consume from or produce to Fluvio streaming topics as an alternative to Kafka.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/fluvio/mod.rs
Signature
pub struct FluvioConnector {}
impl Connector for FluvioConnector {
type ProfileT = EmptyConfig;
type TableT = FluvioTable;
fn name(&self) -> &'static str; // returns "fluvio"
fn table_type(&self, _: EmptyConfig, t: FluvioTable) -> ConnectionType;
fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
table: FluvioTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn make_operator(&self, _: EmptyConfig, table: FluvioTable,
config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
impl SourceOffset {
pub fn offset(&self) -> Offset; // Earliest -> beginning(), Latest -> end()
}
Import
use arroyo_connectors::fluvio::FluvioConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| topic | String | Yes | Fluvio topic name |
| type | TableType | Yes | Source (with offset) or Sink |
| endpoint | Option<String> | No | Custom Fluvio cluster endpoint |
| format | Format | Yes | Serialization/deserialization format |
| source.offset | SourceOffset | No | Earliest or Latest (default: Latest) |
Outputs
| Name | Type | Description |
|---|---|---|
| Connection | Connection | Configured Fluvio connection (Source or Sink) |
| ConstructedOperator | ConstructedOperator | FluvioSourceFunc or FluvioSinkFunc operator |
Usage Examples
CREATE TABLE fluvio_source (
value TEXT
) WITH (
connector = 'fluvio',
topic = 'my-topic',
type = 'source',
'source.offset' = 'earliest',
format = 'json'
);