Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Polling Http Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

PollingHTTPConnector implements the Arroyo Connector trait for periodically polling an HTTP endpoint to produce streaming events, with configurable method, headers, body, interval, and emit behavior.

Description

The Polling HTTP connector is a source-only connector that periodically sends HTTP requests to a configured endpoint and deserializes the response body into records. It supports GET, POST, PUT, and PATCH methods, custom headers (as colon-separated key-value pairs with environment variable substitution via VarStr), and an optional request body. The poll_interval_ms parameter controls the polling frequency (default: 1 second). The emit_behavior parameter supports All (emit every response) or Changed (only emit when the response body differs from the previous one). The connector includes a test method that validates the endpoint by sending a single request. Headers are parsed into reqwest::HeaderMap entries for the PollingHttpSourceFunc operator.

Usage

Use PollingHTTPConnector when you need to ingest data from a REST API or HTTP endpoint at regular intervals into an Arroyo streaming pipeline.

Code Reference

Source Location

Signature

pub struct PollingHTTPConnector {}

impl Connector for PollingHTTPConnector {
    type ProfileT = EmptyConfig;
    type TableT = PollingHttpTable;

    fn name(&self) -> &'static str; // returns "polling_http"
    fn table_type(&self, _: EmptyConfig, _: PollingHttpTable) -> ConnectionType; // always Source
    fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
        table: PollingHttpTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn make_operator(&self, _: EmptyConfig, table: PollingHttpTable,
        config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_connectors::polling_http::PollingHTTPConnector;

I/O Contract

Inputs

Name Type Required Description
endpoint String Yes HTTP URL to poll
method Option<Method> No GET (default), POST, PUT, or PATCH
headers Option<VarStr> No Comma-separated colon-delimited header pairs
body Option<String> No Request body for POST/PUT/PATCH requests
poll_interval_ms Option<i64> No Polling interval in milliseconds (default: 1000)
emit_behavior Option<EmitBehavior> No All (default) or Changed (skip duplicate responses)
format Format Yes Deserialization format for response body

Outputs

Name Type Description
records RecordBatch Deserialized Arrow record batches from HTTP responses

Usage Examples

CREATE TABLE http_source (
    value TEXT
) WITH (
    connector = 'polling_http',
    endpoint = 'https://api.example.com/data',
    method = 'GET',
    poll_interval_ms = '5000',
    emit_behavior = 'changed',
    format = 'json'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment