Implementation:ArroyoSystems Arroyo Validate Query
| Field | Value |
|---|---|
| Sources | ArroyoSystems/arroyo |
| Domains | Stream_Processing, SQL |
| Last Updated | 2026-02-08 |
Overview
validate_query is the REST API endpoint handler for validating SQL queries in the Arroyo streaming engine. It accepts a SQL query string and optional UDF definitions, runs them through the SQL planner's validation pipeline, and returns a structured result containing either a pipeline graph preview on success or a list of error messages on failure.
Description
The validate_query function is an asynchronous Axum route handler that provides the POST /v1/pipelines/validate endpoint. It extracts the application state, authenticates the request via bearer token, deserializes the JSON request body into a ValidateQueryPost struct, and delegates to the SQL compilation and planning infrastructure for validation. The function does not persist any state or create a pipeline -- it serves purely as a validation and preview mechanism.
The validation flow proceeds as follows:
- Extract the SQL query text and optional UDF definitions from the request body.
- Build a schema provider populated with the user's registered connections and sources.
- If UDFs are provided, compile and register them with the schema provider.
- Parse the SQL query and generate a logical plan using the Arroyo SQL planner.
- If planning succeeds, convert the logical program into a
PipelineGraphpreview and return it. - If planning fails, collect the error messages and return them in the
errorsfield.
Usage
Import this function when registering API routes in the Arroyo API server or when writing integration tests for SQL query validation.
Code Reference
Source Location
- Repository
ArroyoSystems/arroyo(GitHub)- File
crates/arroyo-api/src/pipelines.rs- Lines
- L540--L570
Signature
pub async fn validate_query(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Json(req): Json<ValidateQueryPost>,
) -> Result<Json<QueryValidationResult>, ErrorResp>
Import
use arroyo_api::pipelines::validate_query;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
State(state) |
State<AppState> |
Yes | Axum application state containing database pool, configuration, and shared services |
bearer_auth |
BearerAuth |
Yes | Bearer token for request authentication and authorization |
req.query |
String |
Yes | The SQL query text to validate |
req.udfs |
Option<Vec<Udf>> |
No | Optional list of user-defined function definitions to register before validation |
Outputs
| Condition | Type | Description |
|---|---|---|
| Success | Json<QueryValidationResult> |
Contains graph: Option<PipelineGraph> with the pipeline topology preview
|
| Validation failure | Json<QueryValidationResult> |
Contains errors: Vec<String> with descriptive error messages
|
| Server error | ErrorResp |
HTTP error response for authentication failures or internal errors |
Usage Examples
Validate a Simple Query via HTTP
curl -X POST http://localhost:5115/v1/pipelines/validate \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"query": "SELECT * FROM source WHERE count > 10"
}'
Successful Response
{
"graph": {
"nodes": [
{"id": "source_0", "operator": "KafkaSource", "parallelism": 4},
{"id": "filter_1", "operator": "Filter(count > 10)", "parallelism": 4}
],
"edges": [
{"from": "source_0", "to": "filter_1"}
]
},
"errors": []
}
Validation Failure Response
{
"graph": null,
"errors": [
"Table 'nonexistent_source' not found. Available tables: clicks, pageviews"
]
}
Validate with UDFs
curl -X POST http://localhost:5115/v1/pipelines/validate \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"query": "SELECT my_udf(name) FROM source",
"udfs": [
{
"language": "rust",
"definition": "fn my_udf(s: &str) -> String { s.to_uppercase() }"
}
]
}'