Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Workflow:ArroyoSystems Arroyo Connection Setup

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Data_Engineering, Connectors
Last Updated 2026-02-08 09:00 GMT

Overview

End-to-end process for configuring external data sources and sinks in Arroyo by creating connection profiles (reusable credentials) and connection tables (schema-bound data endpoints) that can be referenced in SQL pipelines.

Description

This workflow covers the creation and configuration of connections to external systems in Arroyo. Arroyo supports 16 connector types including Kafka, NATS, MQTT, Kinesis, Redis, S3/GCS filesystems, WebSocket, SSE, HTTP polling, and webhook endpoints. The connection setup follows a two-tier model: connection profiles store reusable credentials and server addresses, while connection tables define specific data endpoints with schemas. The WebUI provides a guided wizard for this process, and the REST API supports programmatic configuration. The resulting connection tables become available as SQL tables in pipeline queries.

Key aspects:

  • Two-tier model separates reusable credentials (profiles) from endpoint-specific configuration (tables)
  • Schema support includes JSON, Avro (with Confluent Schema Registry), Protobuf, and raw formats
  • Connection testing validates connectivity before persisting configuration
  • Connector metadata fields (e.g., Kafka offset, partition, timestamp) can be projected into schemas

Usage

Execute this workflow when you need to connect Arroyo to an external data system before creating a SQL pipeline. You must configure at least one source connection table and may optionally configure sink connection tables (stdout is the default sink). This workflow must be completed before the SQL Pipeline Lifecycle workflow.

Execution Steps

Step 1: Select Connector Type

Choose the appropriate connector type from the available options. Each connector type defines a JSON Schema-driven configuration form for both its profile (server-level settings) and table (endpoint-level settings). The system supports source connectors (Kafka, NATS, MQTT, Kinesis, SSE, WebSocket, HTTP polling, filesystem, Fluvio, RabbitMQ), sink connectors (Kafka, NATS, Redis, filesystem/Delta/Iceberg, webhook, stdout, blackhole), and dual-mode connectors that can serve as both source and sink.

Key considerations:

  • Some connectors require profiles (Kafka, NATS, MQTT, Redis, Confluent, RabbitMQ) while others configure inline (SSE, WebSocket, HTTP polling)
  • Filesystem connectors support multiple output formats: Parquet, JSON, Delta Lake, and Apache Iceberg

Step 2: Create Connection Profile

Create a connection profile containing the reusable server connection details and authentication credentials for the selected connector type. The profile is validated by testing the actual connection to the external system before being persisted. Profile configuration varies by connector: Kafka requires bootstrap servers and optional SASL/MSK IAM auth, Redis requires connection address and auth, NATS requires server URLs and optional credentials.

Key considerations:

  • Connection testing is performed before persisting to ensure valid credentials
  • Profiles can be reused across multiple connection tables
  • Profile deletion is blocked if any connection tables reference it
  • Autocomplete features (e.g., listing available Kafka topics) are available after profile creation

Step 3: Define Schema

Define the data schema for the connection table. The schema describes the structure of records flowing through the connection. Multiple schema definition methods are supported: JSON Schema (inline or inferred), Avro (with optional Confluent Schema Registry integration), Protobuf (with file descriptor sets), and raw formats (unstructured string or bytes). The schema is expanded and converted to Arrow field types for use in the query engine.

Key considerations:

  • JSON Schema inference can derive the schema from sample messages
  • Avro schemas can be pulled from Confluent Schema Registry by subject
  • Protobuf schemas require compiled file descriptor sets
  • The schema determines what columns are available in SQL queries against this table

Step 4: Configure Connection Table

Create the connection table by combining the connector type, connection profile, schema, and endpoint-specific settings. For Kafka, this includes the topic name, source/sink mode, and consumer offset configuration. For filesystem connectors, this includes the storage path, file format, and partitioning strategy. The table configuration is validated against the connector's JSON Schema definition.

Key considerations:

  • Source tables define WHERE data comes from (topic, stream, URL, path)
  • Sink tables define WHERE data goes and HOW it is written (format, commit mode)
  • Watermark configuration can be set to control time-based windowing behavior
  • Kafka sinks support at-least-once or exactly-once commit modes

Step 5: Test Connection

Validate the complete connection table configuration by creating a temporary connector instance and attempting to read or write test messages. For source connectors, this streams a sample of messages back to the client via Server-Sent Events (SSE) to verify both connectivity and schema compatibility. Connection test results show whether the format deserialization succeeds against the configured schema.

Key considerations:

  • Testing validates the full pipeline: connectivity, authentication, topic/stream existence, and schema compatibility
  • For sources, sample messages are deserialized using the configured format to verify schema correctness
  • Test failures provide diagnostic information about which stage failed (connection, authentication, format parsing)

Step 6: Use in SQL Pipeline

The connection table is now available as a named SQL table. Source tables appear as queryable tables in SELECT statements, and sink tables can be targeted with INSERT INTO statements. The table's schema defines the available columns, and connector metadata fields (if configured) are accessible as additional columns.

Key considerations:

  • Connection tables are loaded into the schema provider during SQL compilation
  • Multiple pipelines can reference the same connection table
  • SQL CREATE TABLE statements can also define inline connection tables within pipeline queries

Execution Diagram

GitHub URL

Workflow Repository