Implementation:ArroyoSystems Arroyo Impulse Operator
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
ImpulseSourceFunc is the Arroyo source operator that generates synthetic events at a configurable rate with checkpointing support and parallelism-aware event counting.
Description
The Impulse source operator produces Arrow record batches with three columns: counter (Int64), subtask_index (Int64 scalar), and _timestamp (TimestampNanosecond). It supports two generation modes via ImpulseSpec: Delay (fixed duration between events) and EventsPerSecond (rate divided across parallel tasks). Events are batched for efficiency (batch size computed from the delay to accumulate approximately 100ms of events, capped at 8192). The operator supports deterministic event timestamps when an interval is configured, advancing timestamps by interval * counter; otherwise, it uses wall-clock time. State is stored in ImpulseSourceState containing the counter and start_time, persisted to global keyed state during checkpoints. The new_aligned constructor aligns the start time to a configurable duration boundary for testing reproducibility. The operator processes control messages (Checkpoint, Stop, Commit, LoadCompacted) between event batches and returns SourceFinishType::Final when the event limit is reached.
Usage
Use ImpulseSourceFunc as the runtime operator backing the Impulse connector for generating synthetic data in Arroyo pipelines.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/impulse/operator.rs
Signature
#[derive(Encode, Decode, Debug, Copy, Clone, Eq, PartialEq)]
pub struct ImpulseSourceState {
pub counter: usize,
pub start_time: SystemTime,
}
#[derive(Debug, Clone, Copy)]
pub enum ImpulseSpec {
Delay(Duration),
EventsPerSecond(f32),
}
#[derive(Debug)]
pub struct ImpulseSourceFunc {
pub interval: Option<Duration>,
pub spec: ImpulseSpec,
pub limit: usize,
pub state: ImpulseSourceState,
}
impl ImpulseSourceFunc {
pub fn new(interval: Option<Duration>, spec: ImpulseSpec,
limit: usize, start_time: SystemTime) -> Self;
pub fn new_aligned(interval: Duration, spec: ImpulseSpec,
alignment_duration: Duration, limit: usize, start_time: SystemTime) -> Self;
}
#[async_trait]
impl SourceOperator for ImpulseSourceFunc {
fn name(&self) -> String; // "impulse-source"
fn tables(&self) -> HashMap<String, TableConfig>;
async fn on_start(&mut self, ctx: &mut SourceContext) -> DataflowResult<()>;
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
-> DataflowResult<SourceFinishType>;
}
Import
use arroyo_connectors::impulse::operator::{ImpulseSourceFunc, ImpulseSourceState, ImpulseSpec};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| spec | ImpulseSpec | Yes | Delay(Duration) or EventsPerSecond(f32) |
| limit | usize | Yes | Maximum events to generate (usize::MAX for unbounded) |
| interval | Option<Duration> | No | Duration between event timestamps (None for wall-clock time) |
Outputs
| Name | Type | Description |
|---|---|---|
| counter | Int64 | Monotonically increasing counter per task |
| subtask_index | Int64 | Task index scalar (constant per parallel instance) |
| _timestamp | TimestampNanosecond | Event timestamp (deterministic or wall-clock) |
Usage Examples
let source = ImpulseSourceFunc::new(
Some(Duration::from_secs(1)),
ImpulseSpec::EventsPerSecond(100.0),
1_000_000,
SystemTime::now(),
);