Implementation:ArroyoSystems Arroyo Sse Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
SSEConnector implements the Arroyo Connector trait for Server-Sent Events (SSE/EventSource), providing a source-only connector that streams events from an SSE endpoint with optional event type filtering and custom headers.
Description
The SSE connector uses the eventsource-client crate to establish SSE connections. It supports custom HTTP headers (as colon-separated key-value pairs with environment variable substitution via VarStr) and optional events filtering to subscribe only to specific SSE event types. The test method validates connectivity by constructing a client, connecting to the endpoint, and waiting for at least one message within a 30-second timeout. The SseTester struct encapsulates the test logic, sending progress messages ("Constructed SSE client", "Received message from SSE server") to the caller. The connector constructs a SSESourceFunc operator via SSESourceFunc::new_operator with the table configuration and operator config.
Usage
Use SSEConnector when you need to consume real-time data from Server-Sent Events endpoints (such as live news feeds, stock tickers, or event streams) in an Arroyo pipeline.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/sse/mod.rs
Signature
pub struct SSEConnector {}
impl Connector for SSEConnector {
type ProfileT = EmptyConfig;
type TableT = SseTable;
fn name(&self) -> &'static str; // returns "sse"
fn table_type(&self, _: EmptyConfig, _: SseTable) -> ConnectionType; // always Source
fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
table: SseTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn make_operator(&self, _: EmptyConfig, table: SseTable,
config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
struct SseTester {
config: SseTable,
tx: Sender<TestSourceMessage>,
}
Import
use arroyo_connectors::sse::SSEConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| endpoint | String | Yes | SSE/EventSource server URL |
| headers | Option<VarStr> | No | Comma-separated colon-delimited header pairs |
| events | Option<String> | No | SSE event types to filter on |
| format | Format | Yes | Deserialization format for event data |
Outputs
| Name | Type | Description |
|---|---|---|
| records | RecordBatch | Deserialized Arrow record batches from SSE event data |
Usage Examples
CREATE TABLE sse_source (
value TEXT
) WITH (
connector = 'sse',
endpoint = 'https://stream.example.com/events',
headers = 'Authorization:Bearer mytoken',
events = 'message',
format = 'json'
);