Implementation:ArroyoSystems Arroyo Polling Http Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
PollingHTTPConnector implements the Arroyo Connector trait for periodically polling an HTTP endpoint to produce streaming events, with configurable method, headers, body, interval, and emit behavior.
Description
The Polling HTTP connector is a source-only connector that periodically sends HTTP requests to a configured endpoint and deserializes the response body into records. It supports GET, POST, PUT, and PATCH methods, custom headers (as colon-separated key-value pairs with environment variable substitution via VarStr), and an optional request body. The poll_interval_ms parameter controls the polling frequency (default: 1 second). The emit_behavior parameter supports All (emit every response) or Changed (only emit when the response body differs from the previous one). The connector includes a test method that validates the endpoint by sending a single request. Headers are parsed into reqwest::HeaderMap entries for the PollingHttpSourceFunc operator.
Usage
Use PollingHTTPConnector when you need to ingest data from a REST API or HTTP endpoint at regular intervals into an Arroyo streaming pipeline.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/polling_http/mod.rs
Signature
pub struct PollingHTTPConnector {}
impl Connector for PollingHTTPConnector {
type ProfileT = EmptyConfig;
type TableT = PollingHttpTable;
fn name(&self) -> &'static str; // returns "polling_http"
fn table_type(&self, _: EmptyConfig, _: PollingHttpTable) -> ConnectionType; // always Source
fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
table: PollingHttpTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn make_operator(&self, _: EmptyConfig, table: PollingHttpTable,
config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_connectors::polling_http::PollingHTTPConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| endpoint | String | Yes | HTTP URL to poll |
| method | Option<Method> | No | GET (default), POST, PUT, or PATCH |
| headers | Option<VarStr> | No | Comma-separated colon-delimited header pairs |
| body | Option<String> | No | Request body for POST/PUT/PATCH requests |
| poll_interval_ms | Option<i64> | No | Polling interval in milliseconds (default: 1000) |
| emit_behavior | Option<EmitBehavior> | No | All (default) or Changed (skip duplicate responses) |
| format | Format | Yes | Deserialization format for response body |
Outputs
| Name | Type | Description |
|---|---|---|
| records | RecordBatch | Deserialized Arrow record batches from HTTP responses |
Usage Examples
CREATE TABLE http_source (
value TEXT
) WITH (
connector = 'polling_http',
endpoint = 'https://api.example.com/data',
method = 'GET',
poll_interval_ms = '5000',
emit_behavior = 'changed',
format = 'json'
);