Implementation:ArroyoSystems Arroyo Mqtt Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
MqttConnector implements the Arroyo Connector trait for MQTT, providing both source and sink capabilities with TLS support, configurable QoS levels, and connection testing.
Description
The MQTT connector uses the rumqttc async MQTT client library. It supports connection via mqtt:// and mqtts:// (SSL/TLS) URLs with optional username/password authentication and client certificate-based mutual TLS. The connector constructs unique client IDs per task using the pattern <prefix>_<job_id>_<operator_id>_<task_index> to avoid broker conflicts. Configuration is driven by JSON schemas for both the connection profile (MqttConfig) and the table definition (MqttTable). The table type determines whether an MqttSourceFunc or MqttSinkFunc operator is created. QoS levels (AtMostOnce, AtLeastOnce, ExactlyOnce) are configurable per table. The connector exposes a topic metadata field and supports the test_profile and test methods to validate connectivity by publishing and optionally subscribing to test messages.
Usage
Use MqttConnector when building Arroyo pipelines that read from or write to MQTT brokers, such as IoT data ingestion or device command publishing.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/mqtt/mod.rs
Signature
pub struct MqttConnector {}
impl Connector for MqttConnector {
type ProfileT = MqttConfig;
type TableT = MqttTable;
fn name(&self) -> &'static str;
fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
fn from_config(&self, id: Option<i64>, name: &str, config: MqttConfig,
table: MqttTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn make_operator(&self, profile: MqttConfig, table: MqttTable,
config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
pub(crate) fn create_connection(c: &MqttConfig, job_id: &str,
operator_id: &str, task_index: usize) -> anyhow::Result<(AsyncClient, EventLoop)>;
Import
use arroyo_connectors::mqtt::MqttConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| url | String | Yes | MQTT broker URL (mqtt:// or mqtts://) |
| topic | String | Yes | MQTT topic to subscribe or publish to |
| type | TableType | Yes | Source or Sink with optional retain flag |
| qos | QualityOfService | No | AtMostOnce, AtLeastOnce, or ExactlyOnce (default: AtMostOnce) |
| username | Option<VarStr> | No | MQTT username for authentication |
| password | Option<VarStr> | No | MQTT password for authentication |
| tls | Option<Tls> | No | TLS configuration (CA, client cert, key) |
Outputs
| Name | Type | Description |
|---|---|---|
| Connection | Connection | Configured Arroyo connection for use in pipeline graph |
| ConstructedOperator | ConstructedOperator | MqttSourceFunc or MqttSinkFunc operator instance |
Usage Examples
CREATE TABLE mqtt_input (
value TEXT
) WITH (
connector = 'mqtt',
url = 'mqtt://localhost:1883',
topic = 'sensor/data',
type = 'source',
format = 'json'
);