Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Sse Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

SSEConnector implements the Arroyo Connector trait for Server-Sent Events (SSE/EventSource), providing a source-only connector that streams events from an SSE endpoint with optional event type filtering and custom headers.

Description

The SSE connector uses the eventsource-client crate to establish SSE connections. It supports custom HTTP headers (as colon-separated key-value pairs with environment variable substitution via VarStr) and optional events filtering to subscribe only to specific SSE event types. The test method validates connectivity by constructing a client, connecting to the endpoint, and waiting for at least one message within a 30-second timeout. The SseTester struct encapsulates the test logic, sending progress messages ("Constructed SSE client", "Received message from SSE server") to the caller. The connector constructs a SSESourceFunc operator via SSESourceFunc::new_operator with the table configuration and operator config.

Usage

Use SSEConnector when you need to consume real-time data from Server-Sent Events endpoints (such as live news feeds, stock tickers, or event streams) in an Arroyo pipeline.

Code Reference

Source Location

Signature

pub struct SSEConnector {}

impl Connector for SSEConnector {
    type ProfileT = EmptyConfig;
    type TableT = SseTable;

    fn name(&self) -> &'static str; // returns "sse"
    fn table_type(&self, _: EmptyConfig, _: SseTable) -> ConnectionType; // always Source
    fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
        table: SseTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, _: EmptyConfig, table: SseTable,
        config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

struct SseTester {
    config: SseTable,
    tx: Sender<TestSourceMessage>,
}

Import

use arroyo_connectors::sse::SSEConnector;

I/O Contract

Inputs

Name Type Required Description
endpoint String Yes SSE/EventSource server URL
headers Option<VarStr> No Comma-separated colon-delimited header pairs
events Option<String> No SSE event types to filter on
format Format Yes Deserialization format for event data

Outputs

Name Type Description
records RecordBatch Deserialized Arrow record batches from SSE event data

Usage Examples

CREATE TABLE sse_source (
    value TEXT
) WITH (
    connector = 'sse',
    endpoint = 'https://stream.example.com/events',
    headers = 'Authorization:Bearer mytoken',
    events = 'message',
    format = 'json'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment