Implementation:ArroyoSystems Arroyo Nexmark Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
NexmarkConnector implements the Arroyo Connector trait for the Nexmark benchmark, providing a source-only connector that generates simulated auction events with configurable event rate and optional bounded runtime.
Description
The Nexmark connector generates events conforming to the NEXMark benchmark schema: three nested struct columns for Person, Auction, and Bid events. The schema is defined via Arrow Field definitions with specific data types (Int64, Utf8, Timestamp) and is returned by nexmark_schema(). The connector does not support custom schemas; if one is provided it must match the inferred Nexmark schema. Configuration consists of event_rate (events per second) and an optional runtime (seconds, making the source bounded). The make_operator method constructs a NexmarkSourceFunc from the table configuration. Person fields include id, name, email_address, credit_card, city, state, datetime, and extra. Auction fields include id, description, item_name, initial_bid, reserve, datetime, expires, seller, category, and extra. Bid fields include auction, bidder, price, channel, url, datetime, and extra.
Usage
Use NexmarkConnector when benchmarking Arroyo streaming SQL queries against the industry-standard Nexmark workload with configurable throughput.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/nexmark/mod.rs
Signature
pub struct NexmarkConnector {}
pub fn nexmark_schema() -> ConnectionSchema;
pub(crate) fn person_fields() -> Vec<Field>;
pub(crate) fn auction_fields() -> Vec<Field>;
pub(crate) fn bid_fields() -> Vec<Field>;
impl Connector for NexmarkConnector {
type ProfileT = EmptyConfig;
type TableT = NexmarkTable;
fn name(&self) -> &'static str; // returns "nexmark"
fn table_type(&self, _: EmptyConfig, _: NexmarkTable) -> ConnectionType; // always Source
fn get_schema(&self, _: EmptyConfig, _: NexmarkTable,
_: Option<&ConnectionSchema>) -> Option<ConnectionSchema>;
fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
table: NexmarkTable, _: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn make_operator(&self, _: EmptyConfig, table: NexmarkTable,
_: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_connectors::nexmark::NexmarkConnector;
use arroyo_connectors::nexmark::nexmark_schema;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| event_rate | f64 | Yes | Target events per second |
| runtime | Option<f64> | No | Runtime in seconds (None for unbounded) |
Outputs
| Name | Type | Description |
|---|---|---|
| person | Struct (nullable) | Person event struct (id, name, email, credit_card, city, state, datetime, extra) |
| auction | Struct (nullable) | Auction event struct (id, description, item_name, initial_bid, reserve, datetime, expires, seller, category, extra) |
| bid | Struct (nullable) | Bid event struct (auction, bidder, price, channel, url, datetime, extra) |
Usage Examples
CREATE TABLE nexmark (
person STRUCT<id BIGINT, name TEXT, ...>,
auction STRUCT<id BIGINT, ...>,
bid STRUCT<auction BIGINT, ...>
) WITH (
connector = 'nexmark',
event_rate = '10000'
);