Implementation:ArroyoSystems Arroyo Error Types
| Knowledge Sources | |
|---|---|
| Domains | Streaming, ErrorHandling |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Defines the unified error type hierarchy for the Arroyo dataflow engine, including DataflowError, StateError, StorageError, and TaskError, with domain classification and retry-hint semantics.
Description
This module establishes a structured error taxonomy used across all Arroyo dataflow operators, connectors, and state backends:
- DataflowError -- the primary error enum for dataflow processing, with variants for Arrow errors, DataFusion errors, connector errors (with domain/retry metadata), state errors, argument errors, data deserialization errors, and operator-scoped wrapper errors.
- ErrorDomain -- classifies errors as User (configuration/SQL), External (I/O, network), or Internal (bugs).
- RetryHint -- indicates whether an error should be retried (NoRetry vs. WithBackoff).
- TaskError -- the downstream representation of a DataflowError used for reporting to the controller, with automatic domain/retry classification.
- StateError -- errors specific to the state backend (missing tables, wrong types, storage failures, codec errors).
- StorageError -- errors for object store operations (invalid URLs, credential failures).
- UserError -- a simple name+details pair for user-facing errors.
- SourceError -- a factory for creating DataflowError::DataError instances for deserialization failures.
The connector_err! macro provides ergonomic construction of DataflowError::ConnectorError with domain, retry hint, optional source error, and format string.
Usage
Use DataflowError as the return type for all operator and connector processing functions. Use the connector_err! macro in connector implementations. The classify_datafusion_error function automatically maps DataFusion errors to appropriate domain/retry pairs.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-rpc/src/errors.rs
Signature
pub enum DataflowError {
ArrowError(ArrowError),
DataFusionError(DataFusionError),
InternalOperatorError { error: &'static str, message: String },
StateError(StateError),
ArgumentError(String),
ExternalError(String),
DataError { details: String, count: usize },
ConnectorError { domain: ErrorDomain, retry: RetryHint, error: String, source: Option<anyhow::Error> },
UnknownError(anyhow::Error),
WithOperator { error: Box<DataflowError>, operator_id: String },
}
pub enum ErrorDomain { User, External, Internal }
pub enum RetryHint { NoRetry, WithBackoff }
pub enum StateError {
NoRegisteredTable { table: String },
WrongTableKind { table: String, expected: &'static str },
WrongValueType { table: String, expected: &'static str },
UnsupportedStateVersion { table: String, found: u32, expected: u32 },
MigrationFailed { table: String, error: String },
Other { table: String, error: String },
StorageError(StorageError),
ArrowError(arrow::error::ArrowError),
ParquetError(ParquetError),
BincodeDecodeError(bincode::error::DecodeError),
BincodeEncodeError(bincode::error::EncodeError),
ProtoDecodeError(prost::DecodeError),
}
pub type DataflowResult<T> = Result<T, DataflowError>;
Import
use arroyo_rpc::errors::{DataflowError, DataflowResult, ErrorDomain, RetryHint, StateError, StorageError};
use arroyo_rpc::connector_err;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| domain | ErrorDomain | Yes (for ConnectorError) | Classification: User, External, or Internal |
| retry | RetryHint | Yes (for ConnectorError) | Retry policy: NoRetry or WithBackoff |
| error | String | Yes | Human-readable error message |
| source | Option<anyhow::Error> | No | Underlying cause error |
Outputs
| Name | Type | Description |
|---|---|---|
| TaskError | TaskError | Classified error for controller reporting with domain, retry_hint, operator_id |
| DataflowResult<T> | Result<T, DataflowError> | Standard result alias used across all operators |
Usage Examples
use arroyo_rpc::connector_err;
use arroyo_rpc::errors::{DataflowResult, SourceError};
// Using the connector_err! macro in a connector
fn read_from_source() -> DataflowResult<()> {
let result = do_network_call().map_err(|e|
connector_err!(External, WithBackoff, source: e.into(), "failed to read from Kafka: {}", topic)
)?;
Ok(())
}
// Creating a data deserialization error
let err = SourceError::bad_data("invalid JSON at offset 42");