Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Impulse Operator

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

ImpulseSourceFunc is the Arroyo source operator that generates synthetic events at a configurable rate with checkpointing support and parallelism-aware event counting.

Description

The Impulse source operator produces Arrow record batches with three columns: counter (Int64), subtask_index (Int64 scalar), and _timestamp (TimestampNanosecond). It supports two generation modes via ImpulseSpec: Delay (fixed duration between events) and EventsPerSecond (rate divided across parallel tasks). Events are batched for efficiency (batch size computed from the delay to accumulate approximately 100ms of events, capped at 8192). The operator supports deterministic event timestamps when an interval is configured, advancing timestamps by interval * counter; otherwise, it uses wall-clock time. State is stored in ImpulseSourceState containing the counter and start_time, persisted to global keyed state during checkpoints. The new_aligned constructor aligns the start time to a configurable duration boundary for testing reproducibility. The operator processes control messages (Checkpoint, Stop, Commit, LoadCompacted) between event batches and returns SourceFinishType::Final when the event limit is reached.

Usage

Use ImpulseSourceFunc as the runtime operator backing the Impulse connector for generating synthetic data in Arroyo pipelines.

Code Reference

Source Location

Signature

#[derive(Encode, Decode, Debug, Copy, Clone, Eq, PartialEq)]
pub struct ImpulseSourceState {
    pub counter: usize,
    pub start_time: SystemTime,
}

#[derive(Debug, Clone, Copy)]
pub enum ImpulseSpec {
    Delay(Duration),
    EventsPerSecond(f32),
}

#[derive(Debug)]
pub struct ImpulseSourceFunc {
    pub interval: Option<Duration>,
    pub spec: ImpulseSpec,
    pub limit: usize,
    pub state: ImpulseSourceState,
}

impl ImpulseSourceFunc {
    pub fn new(interval: Option<Duration>, spec: ImpulseSpec,
        limit: usize, start_time: SystemTime) -> Self;
    pub fn new_aligned(interval: Duration, spec: ImpulseSpec,
        alignment_duration: Duration, limit: usize, start_time: SystemTime) -> Self;
}

#[async_trait]
impl SourceOperator for ImpulseSourceFunc {
    fn name(&self) -> String; // "impulse-source"
    fn tables(&self) -> HashMap<String, TableConfig>;
    async fn on_start(&mut self, ctx: &mut SourceContext) -> DataflowResult<()>;
    async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
        -> DataflowResult<SourceFinishType>;
}

Import

use arroyo_connectors::impulse::operator::{ImpulseSourceFunc, ImpulseSourceState, ImpulseSpec};

I/O Contract

Inputs

Name Type Required Description
spec ImpulseSpec Yes Delay(Duration) or EventsPerSecond(f32)
limit usize Yes Maximum events to generate (usize::MAX for unbounded)
interval Option<Duration> No Duration between event timestamps (None for wall-clock time)

Outputs

Name Type Description
counter Int64 Monotonically increasing counter per task
subtask_index Int64 Task index scalar (constant per parallel instance)
_timestamp TimestampNanosecond Event timestamp (deterministic or wall-clock)

Usage Examples

let source = ImpulseSourceFunc::new(
    Some(Duration::from_secs(1)),
    ImpulseSpec::EventsPerSecond(100.0),
    1_000_000,
    SystemTime::now(),
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment