Implementation:ArroyoSystems Arroyo Nats Source
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
NatsSourceFunc is the Arroyo source operator that consumes messages from NATS, supporting both JetStream pull consumers with stateful checkpointing and Core subject subscriptions.
Description
The NATS source operator supports two consumption modes. In JetStream mode, it creates a pull consumer on a named stream, restoring the start sequence number from checkpointed state. The consumer is configured with parameters including ack_policy, replay_policy, ack_wait, filter_subjects, max_ack_pending, max_deliver, and more. Each message is acknowledged after deserialization, and sequence numbers are tracked in a HashMap keyed by operator ID, persisted to global keyed state during checkpoints. In Core mode, it subscribes to a plain NATS subject and timestamps messages with the current system time. Both modes use a tokio::select! loop to concurrently process messages and control commands (checkpoint, stop, commit, load compacted).
Usage
Use NatsSourceFunc when you need to ingest messages from NATS JetStream streams with durable, checkpointed consumption, or from plain NATS subjects for simpler at-most-once delivery.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/nats/source/mod.rs
Signature
pub struct NatsSourceFunc {
pub source_type: SourceType,
pub servers: String,
pub connection: NatsConfig,
pub table: NatsTable,
pub format: Format,
pub framing: Option<Framing>,
pub bad_data: Option<BadData>,
pub messages_per_second: NonZeroU32,
}
#[async_trait]
impl SourceOperator for NatsSourceFunc {
fn name(&self) -> String;
fn tables(&self) -> HashMap<String, TableConfig>;
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
-> DataflowResult<SourceFinishType>;
}
Import
use arroyo_connectors::nats::source::NatsSourceFunc;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| source_type | SourceType | Yes | Jetstream { stream, ack_policy, ... } or Core { subject } |
| servers | String | Yes | NATS server addresses |
| connection | NatsConfig | Yes | Full NATS connection configuration with auth |
| format | Format | Yes | Deserialization format for message payloads |
Outputs
| Name | Type | Description |
|---|---|---|
| records | RecordBatch | Deserialized Arrow record batches from NATS messages |
Usage Examples
let source = NatsSourceFunc {
source_type: SourceType::Jetstream {
stream: "orders".to_string(),
ack_policy: AcknowledgmentPolicy::Explicit,
replay_policy: ReplayPolicy::Instant,
ack_wait: 30,
// ... other consumer config ...
},
servers: "nats://localhost:4222".to_string(),
connection: nats_config,
table: nats_table,
format: Format::Json(JsonFormat::default()),
framing: None,
bad_data: None,
messages_per_second: NonZeroU32::new(u32::MAX).unwrap(),
};