Implementation:ArroyoSystems Arroyo Rpc Core
| Knowledge Sources | |
|---|---|
| Domains | Streaming, RPC, Configuration |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
The core library of the arroyo-rpc crate, providing gRPC protocol definitions, control message types, SQL option parsing via ConnectorOptions, row conversion utilities, TLS/gRPC channel management, event logging infrastructure, and retry macros.
Description
This module is the central hub of the arroyo-rpc crate. It includes:
- gRPC modules -- auto-generated protobuf types via tonic::include_proto! for both the internal RPC protocol (arroyo_rpc) and the public API (api).
- ControlMessage / ControlResp -- enums for the bidirectional control plane between the engine and operators, including Checkpoint, Stop, Commit, LoadCompacted, and error/status response variants.
- ConnectorOptions -- a type-safe parser for SQL WITH clause options that extracts strings, booleans, integers, floats, durations, data sizes, arrays, and fields with proper error messages.
- Converter -- wraps Arrow's RowConverter to handle the empty-key case, used for binary row encoding in state operations.
- OperatorConfig / MetadataField / RateLimit -- configuration types for connector operators at runtime.
- TLS/gRPC utilities -- grpc_channel_builder, connect_grpc, controller_client for establishing optionally TLS-secured gRPC connections with mTLS support.
- Event logging -- log_event! and log_trace_event! macros with a pluggable EventLogger trait.
- retry! -- a macro implementing exponential backoff with jitter for retryable operations.
- DataSizeUnit -- parser for human-readable data sizes (e.g., "10MB", "1GB").
- Constants -- TIMESTAMP_FIELD ("_timestamp"), UPDATING_META_FIELD, and updating_meta_fields/updating_meta_field for CDC metadata.
Usage
This module is imported transitively by nearly every crate in the Arroyo workspace. Use ConnectorOptions when implementing new SQL connector parsing, ControlMessage/ControlResp for operator-engine communication, and the gRPC utilities for inter-service communication.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-rpc/src/lib.rs
Signature
pub enum ControlMessage {
Checkpoint(CheckpointBarrier),
Stop { mode: StopMode },
Commit { epoch: u32, commit_data: HashMap<String, HashMap<u32, Vec<u8>>> },
LoadCompacted { compacted: CompactionResult },
NoOp,
}
pub enum ControlResp {
CheckpointEvent(CheckpointEvent),
CheckpointCompleted(CheckpointCompleted),
TaskStarted { node_id: u32, task_index: usize, start_time: SystemTime },
TaskFinished { node_id: u32, task_index: usize },
TaskFailed { node_id: u32, task_index: usize, error: TaskError },
Error { node_id: u32, operator_id: String, task_index: usize, message: String, details: String },
}
pub struct ConnectorOptions { ... }
impl ConnectorOptions {
pub fn new(sql_opts: &[SqlOption], partition_by: &Option<Vec<Expr>>) -> DFResult<Self>;
pub fn pull_str(&mut self, name: &str) -> DFResult<String>;
pub fn pull_opt_str(&mut self, name: &str) -> DFResult<Option<String>>;
pub fn pull_opt_bool(&mut self, name: &str) -> DFResult<Option<bool>>;
pub fn pull_opt_u64(&mut self, name: &str) -> DFResult<Option<u64>>;
pub fn pull_opt_duration(&mut self, name: &str) -> DFResult<Option<Duration>>;
// ... additional pull_* methods
}
pub enum Converter { RowConverter(RowConverter), Empty(RowConverter, Arc<dyn Array>) }
pub fn get_hasher() -> ahash::RandomState;
pub async fn controller_client(our_name: &str, our_tls: &Option<TlsConfig>) -> Result<ControllerGrpcClient<Channel>>;
pub fn local_address(bind_address: IpAddr) -> String;
pub const TIMESTAMP_FIELD: &str = "_timestamp";
Import
use arroyo_rpc::{
ControlMessage, ControlResp, ConnectorOptions, Converter,
OperatorConfig, MetadataField, TIMESTAMP_FIELD,
controller_client, get_hasher, local_address,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| sql_opts | &[SqlOption] | Yes | SQL WITH clause key-value options from CREATE TABLE |
| endpoint | String | Yes (for gRPC) | Target gRPC service address |
| tls | Option<TlsConfig> | No | TLS configuration for secure connections |
Outputs
| Name | Type | Description |
|---|---|---|
| ConnectorOptions | ConnectorOptions | Parsed SQL options with typed extraction methods |
| Channel | tonic::transport::Channel | Established gRPC channel, optionally with TLS |
| ControllerGrpcClient | ControllerGrpcClient<Channel> | Client for the Arroyo controller service |
Usage Examples
use arroyo_rpc::ConnectorOptions;
// Parse SQL WITH clause options
let mut opts = ConnectorOptions::new(&sql_options, &partition_by)?;
let topic = opts.pull_str("topic")?;
let batch_size = opts.pull_opt_u64("batch_size")?.unwrap_or(1000);
let flush_interval = opts.pull_opt_duration("flush_interval")?;
// Retry with exponential backoff
use arroyo_rpc::retry;
let result = retry!(
do_network_call().await,
10,
Duration::from_millis(100),
Duration::from_secs(30),
|e| tracing::warn!("retrying: {}", e)
);