Implementation:ArroyoSystems Arroyo Nexmark Operator
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
NexmarkSourceFunc is a synthetic data generator that produces Nexmark benchmark events (Person, Auction, Bid) as Arrow record batches at a configurable event rate with checkpointing support.
Description
The Nexmark operator implements the NEXMark benchmark data generator, a standard streaming benchmark simulating an online auction system. It generates three event types: Person (user registrations), Auction (auction listings), and Bid (bids on auctions), in configurable proportions (default 1:3:46). The GeneratorConfig manages event ID assignment, inter-event delay computation, and parallelism-aware splitting. Events include realistic synthetic data drawn from predefined arrays of names, cities, states, and hot channels. The generator supports rate-limited and bounded (time-limited) modes, out-of-order event generation via group shuffling, and wallclock-aligned timestamps. State is checkpointed via NexmarkSourceState containing the generator configuration and event count, enabling resume from the exact position after recovery. Events are written into Arrow StructBuilder instances for Person, Auction, and Bid columns with a timestamp column, flushed periodically based on record count and elapsed time.
Usage
Use NexmarkSourceFunc for benchmarking Arroyo streaming SQL queries using the industry-standard Nexmark benchmark workload.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/nexmark/operator.rs
Signature
pub struct NexmarkSourceFunc {
first_event_rate: f64,
num_events: Option<u64>,
state: Option<NexmarkSourceState>,
}
pub struct NexmarkGenerator {
pub(crate) generator_config: GeneratorConfig,
channel_cache: ChannelCache,
events_count_so_far: u64,
wallclock_base_time: SystemTime,
}
pub struct GeneratorConfig {
configuration: NexmarkConfig,
inter_event_delay: Duration,
base_time: SystemTime,
first_event_id: u64,
max_events: u64,
// ...
}
pub struct Person { pub id: i64, pub name: String, pub email_address: String, ... }
pub struct Auction { pub id: i64, pub item_name: String, pub initial_bid: i64, ... }
pub struct Bid { pub auction: i64, pub bidder: i64, pub price: i64, ... }
#[async_trait]
impl SourceOperator for NexmarkSourceFunc {
fn name(&self) -> String;
fn tables(&self) -> HashMap<String, TableConfig>;
async fn on_start(&mut self, ctx: &mut SourceContext) -> DataflowResult<()>;
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
-> DataflowResult<SourceFinishType>;
}
Import
use arroyo_connectors::nexmark::operator::NexmarkSourceFunc;
use arroyo_connectors::nexmark::operator::NexmarkGenerator;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| first_event_rate | f64 | Yes | Target events per second across all parallel instances |
| num_events | Option<u64> | No | Maximum number of events to generate (None for unbounded) |
Outputs
| Name | Type | Description |
|---|---|---|
| person | Struct (nullable) | Person event data (id, name, email, credit_card, city, state, datetime, extra) |
| auction | Struct (nullable) | Auction event data (id, item_name, description, initial_bid, reserve, datetime, expires, seller, category, extra) |
| bid | Struct (nullable) | Bid event data (auction, bidder, price, channel, url, datetime, extra) |
| _timestamp | TimestampNanosecond | Event timestamp |
Usage Examples
let source = NexmarkSourceFunc::new(10000, Some(1_000_000));
// Or from table config:
let source = NexmarkSourceFunc::from_config(&NexmarkTable {
event_rate: 10000.0,
runtime: Some(60.0),
});