Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Rpc Core

From Leeroopedia


Knowledge Sources
Domains Streaming, RPC, Configuration
Last Updated 2026-02-08 08:00 GMT

Overview

The core library of the arroyo-rpc crate, providing gRPC protocol definitions, control message types, SQL option parsing via ConnectorOptions, row conversion utilities, TLS/gRPC channel management, event logging infrastructure, and retry macros.

Description

This module is the central hub of the arroyo-rpc crate. It includes:

  • gRPC modules -- auto-generated protobuf types via tonic::include_proto! for both the internal RPC protocol (arroyo_rpc) and the public API (api).
  • ControlMessage / ControlResp -- enums for the bidirectional control plane between the engine and operators, including Checkpoint, Stop, Commit, LoadCompacted, and error/status response variants.
  • ConnectorOptions -- a type-safe parser for SQL WITH clause options that extracts strings, booleans, integers, floats, durations, data sizes, arrays, and fields with proper error messages.
  • Converter -- wraps Arrow's RowConverter to handle the empty-key case, used for binary row encoding in state operations.
  • OperatorConfig / MetadataField / RateLimit -- configuration types for connector operators at runtime.
  • TLS/gRPC utilities -- grpc_channel_builder, connect_grpc, controller_client for establishing optionally TLS-secured gRPC connections with mTLS support.
  • Event logging -- log_event! and log_trace_event! macros with a pluggable EventLogger trait.
  • retry! -- a macro implementing exponential backoff with jitter for retryable operations.
  • DataSizeUnit -- parser for human-readable data sizes (e.g., "10MB", "1GB").
  • Constants -- TIMESTAMP_FIELD ("_timestamp"), UPDATING_META_FIELD, and updating_meta_fields/updating_meta_field for CDC metadata.

Usage

This module is imported transitively by nearly every crate in the Arroyo workspace. Use ConnectorOptions when implementing new SQL connector parsing, ControlMessage/ControlResp for operator-engine communication, and the gRPC utilities for inter-service communication.

Code Reference

Source Location

Signature

pub enum ControlMessage {
    Checkpoint(CheckpointBarrier),
    Stop { mode: StopMode },
    Commit { epoch: u32, commit_data: HashMap<String, HashMap<u32, Vec<u8>>> },
    LoadCompacted { compacted: CompactionResult },
    NoOp,
}

pub enum ControlResp {
    CheckpointEvent(CheckpointEvent),
    CheckpointCompleted(CheckpointCompleted),
    TaskStarted { node_id: u32, task_index: usize, start_time: SystemTime },
    TaskFinished { node_id: u32, task_index: usize },
    TaskFailed { node_id: u32, task_index: usize, error: TaskError },
    Error { node_id: u32, operator_id: String, task_index: usize, message: String, details: String },
}

pub struct ConnectorOptions { ... }
impl ConnectorOptions {
    pub fn new(sql_opts: &[SqlOption], partition_by: &Option<Vec<Expr>>) -> DFResult<Self>;
    pub fn pull_str(&mut self, name: &str) -> DFResult<String>;
    pub fn pull_opt_str(&mut self, name: &str) -> DFResult<Option<String>>;
    pub fn pull_opt_bool(&mut self, name: &str) -> DFResult<Option<bool>>;
    pub fn pull_opt_u64(&mut self, name: &str) -> DFResult<Option<u64>>;
    pub fn pull_opt_duration(&mut self, name: &str) -> DFResult<Option<Duration>>;
    // ... additional pull_* methods
}

pub enum Converter { RowConverter(RowConverter), Empty(RowConverter, Arc<dyn Array>) }
pub fn get_hasher() -> ahash::RandomState;
pub async fn controller_client(our_name: &str, our_tls: &Option<TlsConfig>) -> Result<ControllerGrpcClient<Channel>>;
pub fn local_address(bind_address: IpAddr) -> String;
pub const TIMESTAMP_FIELD: &str = "_timestamp";

Import

use arroyo_rpc::{
    ControlMessage, ControlResp, ConnectorOptions, Converter,
    OperatorConfig, MetadataField, TIMESTAMP_FIELD,
    controller_client, get_hasher, local_address,
};

I/O Contract

Inputs

Name Type Required Description
sql_opts &[SqlOption] Yes SQL WITH clause key-value options from CREATE TABLE
endpoint String Yes (for gRPC) Target gRPC service address
tls Option<TlsConfig> No TLS configuration for secure connections

Outputs

Name Type Description
ConnectorOptions ConnectorOptions Parsed SQL options with typed extraction methods
Channel tonic::transport::Channel Established gRPC channel, optionally with TLS
ControllerGrpcClient ControllerGrpcClient<Channel> Client for the Arroyo controller service

Usage Examples

use arroyo_rpc::ConnectorOptions;

// Parse SQL WITH clause options
let mut opts = ConnectorOptions::new(&sql_options, &partition_by)?;
let topic = opts.pull_str("topic")?;
let batch_size = opts.pull_opt_u64("batch_size")?.unwrap_or(1000);
let flush_interval = opts.pull_opt_duration("flush_interval")?;

// Retry with exponential backoff
use arroyo_rpc::retry;
let result = retry!(
    do_network_call().await,
    10,
    Duration::from_millis(100),
    Duration::from_secs(30),
    |e| tracing::warn!("retrying: {}", e)
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment