Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Nexmark Operator

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

NexmarkSourceFunc is a synthetic data generator that produces Nexmark benchmark events (Person, Auction, Bid) as Arrow record batches at a configurable event rate with checkpointing support.

Description

The Nexmark operator implements the NEXMark benchmark data generator, a standard streaming benchmark simulating an online auction system. It generates three event types: Person (user registrations), Auction (auction listings), and Bid (bids on auctions), in configurable proportions (default 1:3:46). The GeneratorConfig manages event ID assignment, inter-event delay computation, and parallelism-aware splitting. Events include realistic synthetic data drawn from predefined arrays of names, cities, states, and hot channels. The generator supports rate-limited and bounded (time-limited) modes, out-of-order event generation via group shuffling, and wallclock-aligned timestamps. State is checkpointed via NexmarkSourceState containing the generator configuration and event count, enabling resume from the exact position after recovery. Events are written into Arrow StructBuilder instances for Person, Auction, and Bid columns with a timestamp column, flushed periodically based on record count and elapsed time.

Usage

Use NexmarkSourceFunc for benchmarking Arroyo streaming SQL queries using the industry-standard Nexmark benchmark workload.

Code Reference

Source Location

Signature

pub struct NexmarkSourceFunc {
    first_event_rate: f64,
    num_events: Option<u64>,
    state: Option<NexmarkSourceState>,
}

pub struct NexmarkGenerator {
    pub(crate) generator_config: GeneratorConfig,
    channel_cache: ChannelCache,
    events_count_so_far: u64,
    wallclock_base_time: SystemTime,
}

pub struct GeneratorConfig {
    configuration: NexmarkConfig,
    inter_event_delay: Duration,
    base_time: SystemTime,
    first_event_id: u64,
    max_events: u64,
    // ...
}

pub struct Person { pub id: i64, pub name: String, pub email_address: String, ... }
pub struct Auction { pub id: i64, pub item_name: String, pub initial_bid: i64, ... }
pub struct Bid { pub auction: i64, pub bidder: i64, pub price: i64, ... }

#[async_trait]
impl SourceOperator for NexmarkSourceFunc {
    fn name(&self) -> String;
    fn tables(&self) -> HashMap<String, TableConfig>;
    async fn on_start(&mut self, ctx: &mut SourceContext) -> DataflowResult<()>;
    async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
        -> DataflowResult<SourceFinishType>;
}

Import

use arroyo_connectors::nexmark::operator::NexmarkSourceFunc;
use arroyo_connectors::nexmark::operator::NexmarkGenerator;

I/O Contract

Inputs

Name Type Required Description
first_event_rate f64 Yes Target events per second across all parallel instances
num_events Option<u64> No Maximum number of events to generate (None for unbounded)

Outputs

Name Type Description
person Struct (nullable) Person event data (id, name, email, credit_card, city, state, datetime, extra)
auction Struct (nullable) Auction event data (id, item_name, description, initial_bid, reserve, datetime, expires, seller, category, extra)
bid Struct (nullable) Bid event data (auction, bidder, price, channel, url, datetime, extra)
_timestamp TimestampNanosecond Event timestamp

Usage Examples

let source = NexmarkSourceFunc::new(10000, Some(1_000_000));
// Or from table config:
let source = NexmarkSourceFunc::from_config(&NexmarkTable {
    event_rate: 10000.0,
    runtime: Some(60.0),
});

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment