Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Fluvio Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

FluvioConnector implements the Arroyo Connector trait for Fluvio, providing both source and sink capabilities for reading from and writing to Fluvio streams.

Description

The Fluvio connector supports Source and Sink table types. Sources can be configured with Earliest or Latest offset modes using the SourceOffset enum, which maps to Fluvio's Offset::beginning() and Offset::end() respectively. The connector requires a format and schema definition for both source and sink configurations. It constructs FluvioSourceFunc for sources (with topic, endpoint, offset_mode, format, framing, and bad_data) and FluvioSinkFunc for sinks (with topic, endpoint, producer, and ArrowSerializer). An optional endpoint parameter allows connecting to non-default Fluvio clusters. Table configuration is loaded from a JSON schema definition.

Usage

Use FluvioConnector when building Arroyo pipelines that need to consume from or produce to Fluvio streaming topics as an alternative to Kafka.

Code Reference

Source Location

Signature

pub struct FluvioConnector {}

impl Connector for FluvioConnector {
    type ProfileT = EmptyConfig;
    type TableT = FluvioTable;

    fn name(&self) -> &'static str; // returns "fluvio"
    fn table_type(&self, _: EmptyConfig, t: FluvioTable) -> ConnectionType;
    fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
        table: FluvioTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, _: EmptyConfig, table: FluvioTable,
        config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

impl SourceOffset {
    pub fn offset(&self) -> Offset; // Earliest -> beginning(), Latest -> end()
}

Import

use arroyo_connectors::fluvio::FluvioConnector;

I/O Contract

Inputs

Name Type Required Description
topic String Yes Fluvio topic name
type TableType Yes Source (with offset) or Sink
endpoint Option<String> No Custom Fluvio cluster endpoint
format Format Yes Serialization/deserialization format
source.offset SourceOffset No Earliest or Latest (default: Latest)

Outputs

Name Type Description
Connection Connection Configured Fluvio connection (Source or Sink)
ConstructedOperator ConstructedOperator FluvioSourceFunc or FluvioSinkFunc operator

Usage Examples

CREATE TABLE fluvio_source (
    value TEXT
) WITH (
    connector = 'fluvio',
    topic = 'my-topic',
    type = 'source',
    'source.offset' = 'earliest',
    format = 'json'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment