Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Mqtt Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

MqttConnector implements the Arroyo Connector trait for MQTT, providing both source and sink capabilities with TLS support, configurable QoS levels, and connection testing.

Description

The MQTT connector uses the rumqttc async MQTT client library. It supports connection via mqtt:// and mqtts:// (SSL/TLS) URLs with optional username/password authentication and client certificate-based mutual TLS. The connector constructs unique client IDs per task using the pattern <prefix>_<job_id>_<operator_id>_<task_index> to avoid broker conflicts. Configuration is driven by JSON schemas for both the connection profile (MqttConfig) and the table definition (MqttTable). The table type determines whether an MqttSourceFunc or MqttSinkFunc operator is created. QoS levels (AtMostOnce, AtLeastOnce, ExactlyOnce) are configurable per table. The connector exposes a topic metadata field and supports the test_profile and test methods to validate connectivity by publishing and optionally subscribing to test messages.

Usage

Use MqttConnector when building Arroyo pipelines that read from or write to MQTT brokers, such as IoT data ingestion or device command publishing.

Code Reference

Source Location

Signature

pub struct MqttConnector {}

impl Connector for MqttConnector {
    type ProfileT = MqttConfig;
    type TableT = MqttTable;

    fn name(&self) -> &'static str;
    fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
    fn from_config(&self, id: Option<i64>, name: &str, config: MqttConfig,
        table: MqttTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, profile: MqttConfig, table: MqttTable,
        config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

pub(crate) fn create_connection(c: &MqttConfig, job_id: &str,
    operator_id: &str, task_index: usize) -> anyhow::Result<(AsyncClient, EventLoop)>;

Import

use arroyo_connectors::mqtt::MqttConnector;

I/O Contract

Inputs

Name Type Required Description
url String Yes MQTT broker URL (mqtt:// or mqtts://)
topic String Yes MQTT topic to subscribe or publish to
type TableType Yes Source or Sink with optional retain flag
qos QualityOfService No AtMostOnce, AtLeastOnce, or ExactlyOnce (default: AtMostOnce)
username Option<VarStr> No MQTT username for authentication
password Option<VarStr> No MQTT password for authentication
tls Option<Tls> No TLS configuration (CA, client cert, key)

Outputs

Name Type Description
Connection Connection Configured Arroyo connection for use in pipeline graph
ConstructedOperator ConstructedOperator MqttSourceFunc or MqttSinkFunc operator instance

Usage Examples

CREATE TABLE mqtt_input (
    value TEXT
) WITH (
    connector = 'mqtt',
    url = 'mqtt://localhost:1883',
    topic = 'sensor/data',
    type = 'source',
    format = 'json'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment