Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Impulse Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

ImpulseConnector is a built-in source-only connector that generates periodic events at a configurable rate, producing a fixed schema of counter and subtask_index fields for testing and development.

Description

The Impulse connector generates synthetic events with a two-column schema: counter (Int64, monotonically increasing) and subtask_index (Int64, the parallel task ID). It supports configurable event_rate (events per second), optional event_time_interval (in nanoseconds, to control how event timestamps advance), and optional message_count (to bound the total number of events). When message_count is specified, the description is prefixed with "Bounded". The connector does not support custom schemas; if a schema is provided, it must match the impulse schema exactly. The make_operator method constructs an ImpulseSourceFunc with ImpulseSpec::EventsPerSecond and the configured limit.

Usage

Use ImpulseConnector for generating synthetic test data streams when developing or benchmarking Arroyo pipelines without requiring external data sources.

Code Reference

Source Location

Signature

pub struct ImpulseConnector {}

pub fn impulse_schema() -> ConnectionSchema;

impl Connector for ImpulseConnector {
    type ProfileT = EmptyConfig;
    type TableT = ImpulseTable;

    fn name(&self) -> &'static str; // returns "impulse"
    fn table_type(&self, _: EmptyConfig, _: ImpulseTable) -> ConnectionType; // always Source
    fn get_schema(&self, _: EmptyConfig, _: ImpulseTable,
        _: Option<&ConnectionSchema>) -> Option<ConnectionSchema>;
    fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
        table: ImpulseTable, _: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, _: EmptyConfig, table: ImpulseTable,
        _: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_connectors::impulse::ImpulseConnector;
use arroyo_connectors::impulse::impulse_schema;

I/O Contract

Inputs

Name Type Required Description
event_rate f64 Yes Target events per second
event_time_interval Option<i64> No Nanosecond interval between event timestamps
message_count Option<i64> No Total number of events to generate (unbounded if None)

Outputs

Name Type Description
counter Int64 Monotonically increasing event counter
subtask_index Int64 Parallel task index generating the event

Usage Examples

CREATE TABLE impulse_source (
    counter BIGINT,
    subtask_index BIGINT
) WITH (
    connector = 'impulse',
    event_rate = '1000',
    message_count = '10000'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment