Implementation:ArroyoSystems Arroyo Nats Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
NatsConnector implements the Arroyo Connector trait for NATS, providing source and sink capabilities with support for both NATS Core subjects and JetStream streams, plus multiple authentication modes.
Description
The NATS connector supports three authentication modes: None, Credentials (username/password), and JWT (with NKey seed signing). Sources can operate in either Core mode (subscribing to a plain NATS subject) or JetStream mode (consuming from a durable JetStream stream with configurable consumer properties such as ack_policy, replay_policy, max_ack_pending, and filter_subjects). Sinks publish to a NATS subject. The connector constructs NatsSourceFunc or NatsSinkFunc operators from the table configuration and serializes connection/table config as JSON for the OperatorConfig. State for JetStream sources is tracked via NatsState containing stream name and sequence number.
Usage
Use NatsConnector when building Arroyo pipelines that need to consume from or produce to NATS Core subjects or JetStream streams.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/nats/mod.rs
Signature
pub struct NatsConnector {}
#[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
pub struct NatsState {
stream_name: String,
stream_sequence_number: u64,
}
impl Connector for NatsConnector {
type ProfileT = NatsConfig;
type TableT = NatsTable;
fn name(&self) -> &'static str;
fn from_config(&self, id: Option<i64>, name: &str, config: NatsConfig,
table: NatsTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn make_operator(&self, profile: NatsConfig, table: NatsTable,
config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
async fn get_nats_client(connection: &NatsConfig) -> anyhow::Result<async_nats::Client>;
Import
use arroyo_connectors::nats::NatsConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| servers | VarStr | Yes | Comma-separated NATS server addresses |
| authentication | NatsConfigAuthentication | Yes | None, Credentials, or JWT authentication |
| connector_type | ConnectorType | Yes | Source (with JetStream or Core) or Sink |
| stream or subject | String | Yes | JetStream stream name or Core subject name |
Outputs
| Name | Type | Description |
|---|---|---|
| Connection | Connection | Configured Arroyo NATS connection |
| ConstructedOperator | ConstructedOperator | NatsSourceFunc or NatsSinkFunc operator |
Usage Examples
CREATE TABLE nats_source (
value TEXT
) WITH (
connector = 'nats',
servers = 'nats://localhost:4222',
type = 'source',
stream = 'my-stream',
format = 'json'
);