Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Error Types

From Leeroopedia


Knowledge Sources
Domains Streaming, ErrorHandling
Last Updated 2026-02-08 08:00 GMT

Overview

Defines the unified error type hierarchy for the Arroyo dataflow engine, including DataflowError, StateError, StorageError, and TaskError, with domain classification and retry-hint semantics.

Description

This module establishes a structured error taxonomy used across all Arroyo dataflow operators, connectors, and state backends:

  • DataflowError -- the primary error enum for dataflow processing, with variants for Arrow errors, DataFusion errors, connector errors (with domain/retry metadata), state errors, argument errors, data deserialization errors, and operator-scoped wrapper errors.
  • ErrorDomain -- classifies errors as User (configuration/SQL), External (I/O, network), or Internal (bugs).
  • RetryHint -- indicates whether an error should be retried (NoRetry vs. WithBackoff).
  • TaskError -- the downstream representation of a DataflowError used for reporting to the controller, with automatic domain/retry classification.
  • StateError -- errors specific to the state backend (missing tables, wrong types, storage failures, codec errors).
  • StorageError -- errors for object store operations (invalid URLs, credential failures).
  • UserError -- a simple name+details pair for user-facing errors.
  • SourceError -- a factory for creating DataflowError::DataError instances for deserialization failures.

The connector_err! macro provides ergonomic construction of DataflowError::ConnectorError with domain, retry hint, optional source error, and format string.

Usage

Use DataflowError as the return type for all operator and connector processing functions. Use the connector_err! macro in connector implementations. The classify_datafusion_error function automatically maps DataFusion errors to appropriate domain/retry pairs.

Code Reference

Source Location

Signature

pub enum DataflowError {
    ArrowError(ArrowError),
    DataFusionError(DataFusionError),
    InternalOperatorError { error: &'static str, message: String },
    StateError(StateError),
    ArgumentError(String),
    ExternalError(String),
    DataError { details: String, count: usize },
    ConnectorError { domain: ErrorDomain, retry: RetryHint, error: String, source: Option<anyhow::Error> },
    UnknownError(anyhow::Error),
    WithOperator { error: Box<DataflowError>, operator_id: String },
}

pub enum ErrorDomain { User, External, Internal }
pub enum RetryHint { NoRetry, WithBackoff }

pub enum StateError {
    NoRegisteredTable { table: String },
    WrongTableKind { table: String, expected: &'static str },
    WrongValueType { table: String, expected: &'static str },
    UnsupportedStateVersion { table: String, found: u32, expected: u32 },
    MigrationFailed { table: String, error: String },
    Other { table: String, error: String },
    StorageError(StorageError),
    ArrowError(arrow::error::ArrowError),
    ParquetError(ParquetError),
    BincodeDecodeError(bincode::error::DecodeError),
    BincodeEncodeError(bincode::error::EncodeError),
    ProtoDecodeError(prost::DecodeError),
}

pub type DataflowResult<T> = Result<T, DataflowError>;

Import

use arroyo_rpc::errors::{DataflowError, DataflowResult, ErrorDomain, RetryHint, StateError, StorageError};
use arroyo_rpc::connector_err;

I/O Contract

Inputs

Name Type Required Description
domain ErrorDomain Yes (for ConnectorError) Classification: User, External, or Internal
retry RetryHint Yes (for ConnectorError) Retry policy: NoRetry or WithBackoff
error String Yes Human-readable error message
source Option<anyhow::Error> No Underlying cause error

Outputs

Name Type Description
TaskError TaskError Classified error for controller reporting with domain, retry_hint, operator_id
DataflowResult<T> Result<T, DataflowError> Standard result alias used across all operators

Usage Examples

use arroyo_rpc::connector_err;
use arroyo_rpc::errors::{DataflowResult, SourceError};

// Using the connector_err! macro in a connector
fn read_from_source() -> DataflowResult<()> {
    let result = do_network_call().map_err(|e|
        connector_err!(External, WithBackoff, source: e.into(), "failed to read from Kafka: {}", topic)
    )?;
    Ok(())
}

// Creating a data deserialization error
let err = SourceError::bad_data("invalid JSON at offset 42");

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment