Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Nats Source

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

NatsSourceFunc is the Arroyo source operator that consumes messages from NATS, supporting both JetStream pull consumers with stateful checkpointing and Core subject subscriptions.

Description

The NATS source operator supports two consumption modes. In JetStream mode, it creates a pull consumer on a named stream, restoring the start sequence number from checkpointed state. The consumer is configured with parameters including ack_policy, replay_policy, ack_wait, filter_subjects, max_ack_pending, max_deliver, and more. Each message is acknowledged after deserialization, and sequence numbers are tracked in a HashMap keyed by operator ID, persisted to global keyed state during checkpoints. In Core mode, it subscribes to a plain NATS subject and timestamps messages with the current system time. Both modes use a tokio::select! loop to concurrently process messages and control commands (checkpoint, stop, commit, load compacted).

Usage

Use NatsSourceFunc when you need to ingest messages from NATS JetStream streams with durable, checkpointed consumption, or from plain NATS subjects for simpler at-most-once delivery.

Code Reference

Source Location

Signature

pub struct NatsSourceFunc {
    pub source_type: SourceType,
    pub servers: String,
    pub connection: NatsConfig,
    pub table: NatsTable,
    pub format: Format,
    pub framing: Option<Framing>,
    pub bad_data: Option<BadData>,
    pub messages_per_second: NonZeroU32,
}

#[async_trait]
impl SourceOperator for NatsSourceFunc {
    fn name(&self) -> String;
    fn tables(&self) -> HashMap<String, TableConfig>;
    async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
        -> DataflowResult<SourceFinishType>;
}

Import

use arroyo_connectors::nats::source::NatsSourceFunc;

I/O Contract

Inputs

Name Type Required Description
source_type SourceType Yes Jetstream { stream, ack_policy, ... } or Core { subject }
servers String Yes NATS server addresses
connection NatsConfig Yes Full NATS connection configuration with auth
format Format Yes Deserialization format for message payloads

Outputs

Name Type Description
records RecordBatch Deserialized Arrow record batches from NATS messages

Usage Examples

let source = NatsSourceFunc {
    source_type: SourceType::Jetstream {
        stream: "orders".to_string(),
        ack_policy: AcknowledgmentPolicy::Explicit,
        replay_policy: ReplayPolicy::Instant,
        ack_wait: 30,
        // ... other consumer config ...
    },
    servers: "nats://localhost:4222".to_string(),
    connection: nats_config,
    table: nats_table,
    format: Format::Json(JsonFormat::default()),
    framing: None,
    bad_data: None,
    messages_per_second: NonZeroU32::new(u32::MAX).unwrap(),
};

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment