Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Nexmark Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

NexmarkConnector implements the Arroyo Connector trait for the Nexmark benchmark, providing a source-only connector that generates simulated auction events with configurable event rate and optional bounded runtime.

Description

The Nexmark connector generates events conforming to the NEXMark benchmark schema: three nested struct columns for Person, Auction, and Bid events. The schema is defined via Arrow Field definitions with specific data types (Int64, Utf8, Timestamp) and is returned by nexmark_schema(). The connector does not support custom schemas; if one is provided it must match the inferred Nexmark schema. Configuration consists of event_rate (events per second) and an optional runtime (seconds, making the source bounded). The make_operator method constructs a NexmarkSourceFunc from the table configuration. Person fields include id, name, email_address, credit_card, city, state, datetime, and extra. Auction fields include id, description, item_name, initial_bid, reserve, datetime, expires, seller, category, and extra. Bid fields include auction, bidder, price, channel, url, datetime, and extra.

Usage

Use NexmarkConnector when benchmarking Arroyo streaming SQL queries against the industry-standard Nexmark workload with configurable throughput.

Code Reference

Source Location

Signature

pub struct NexmarkConnector {}

pub fn nexmark_schema() -> ConnectionSchema;
pub(crate) fn person_fields() -> Vec<Field>;
pub(crate) fn auction_fields() -> Vec<Field>;
pub(crate) fn bid_fields() -> Vec<Field>;

impl Connector for NexmarkConnector {
    type ProfileT = EmptyConfig;
    type TableT = NexmarkTable;

    fn name(&self) -> &'static str; // returns "nexmark"
    fn table_type(&self, _: EmptyConfig, _: NexmarkTable) -> ConnectionType; // always Source
    fn get_schema(&self, _: EmptyConfig, _: NexmarkTable,
        _: Option<&ConnectionSchema>) -> Option<ConnectionSchema>;
    fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
        table: NexmarkTable, _: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, _: EmptyConfig, table: NexmarkTable,
        _: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_connectors::nexmark::NexmarkConnector;
use arroyo_connectors::nexmark::nexmark_schema;

I/O Contract

Inputs

Name Type Required Description
event_rate f64 Yes Target events per second
runtime Option<f64> No Runtime in seconds (None for unbounded)

Outputs

Name Type Description
person Struct (nullable) Person event struct (id, name, email, credit_card, city, state, datetime, extra)
auction Struct (nullable) Auction event struct (id, description, item_name, initial_bid, reserve, datetime, expires, seller, category, extra)
bid Struct (nullable) Bid event struct (auction, bidder, price, channel, url, datetime, extra)

Usage Examples

CREATE TABLE nexmark (
    person STRUCT<id BIGINT, name TEXT, ...>,
    auction STRUCT<id BIGINT, ...>,
    bid STRUCT<auction BIGINT, ...>
) WITH (
    connector = 'nexmark',
    event_rate = '10000'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment