Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Nats Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

NatsConnector implements the Arroyo Connector trait for NATS, providing source and sink capabilities with support for both NATS Core subjects and JetStream streams, plus multiple authentication modes.

Description

The NATS connector supports three authentication modes: None, Credentials (username/password), and JWT (with NKey seed signing). Sources can operate in either Core mode (subscribing to a plain NATS subject) or JetStream mode (consuming from a durable JetStream stream with configurable consumer properties such as ack_policy, replay_policy, max_ack_pending, and filter_subjects). Sinks publish to a NATS subject. The connector constructs NatsSourceFunc or NatsSinkFunc operators from the table configuration and serializes connection/table config as JSON for the OperatorConfig. State for JetStream sources is tracked via NatsState containing stream name and sequence number.

Usage

Use NatsConnector when building Arroyo pipelines that need to consume from or produce to NATS Core subjects or JetStream streams.

Code Reference

Source Location

Signature

pub struct NatsConnector {}

#[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
pub struct NatsState {
    stream_name: String,
    stream_sequence_number: u64,
}

impl Connector for NatsConnector {
    type ProfileT = NatsConfig;
    type TableT = NatsTable;

    fn name(&self) -> &'static str;
    fn from_config(&self, id: Option<i64>, name: &str, config: NatsConfig,
        table: NatsTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, profile: NatsConfig, table: NatsTable,
        config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

async fn get_nats_client(connection: &NatsConfig) -> anyhow::Result<async_nats::Client>;

Import

use arroyo_connectors::nats::NatsConnector;

I/O Contract

Inputs

Name Type Required Description
servers VarStr Yes Comma-separated NATS server addresses
authentication NatsConfigAuthentication Yes None, Credentials, or JWT authentication
connector_type ConnectorType Yes Source (with JetStream or Core) or Sink
stream or subject String Yes JetStream stream name or Core subject name

Outputs

Name Type Description
Connection Connection Configured Arroyo NATS connection
ConstructedOperator ConstructedOperator NatsSourceFunc or NatsSinkFunc operator

Usage Examples

CREATE TABLE nats_source (
    value TEXT
) WITH (
    connector = 'nats',
    servers = 'nats://localhost:4222',
    type = 'source',
    stream = 'my-stream',
    format = 'json'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment