Implementation:ArroyoSystems Arroyo Wait For State
Summary
The Wait For State implementation realizes the Pipeline State Monitoring principle through two cooperating functions: get_state for retrieving the current job state from the API, and wait_for_state for polling until a target state is reached. Together, these functions provide the CLI with the ability to synchronously wait for asynchronous pipeline lifecycle transitions.
Code Reference
- File:
crates/arroyo/src/run.rs(L30-L65) - Functions:
get_state,wait_for_state
Function Signatures
get_state
async fn get_state(client: &Client, pipeline_id: &str) -> String
Retrieves the current state of a pipeline's most recent job by calling the get_pipeline_jobs API endpoint.
wait_for_state
async fn wait_for_state(
client: &Client,
pipeline_id: &str,
expected_states: &[&str],
) -> anyhow::Result<()>
Polls get_state at 100ms intervals until the pipeline's job state matches one of the expected states, or transitions to "Failed".
Parameters
| Parameter | Type | Description |
|---|---|---|
client |
&Client |
Reference to the OpenAPI client for API calls |
pipeline_id |
&str |
The pipeline's unique identifier |
expected_states |
&[&str] |
Slice of target state strings to wait for (e.g., ["Running"] or ["Stopped", "Failed"])
|
Implementation Detail
get_state
async fn get_state(client: &Client, pipeline_id: &str) -> String {
let jobs = retry!(
client.get_pipeline_jobs().id(pipeline_id).send().await,
10,
Duration::from_millis(100),
Duration::from_secs(2),
|e| { warn!("Failed to get job state from API: {}", e) }
)
.unwrap()
.into_inner();
jobs.data.into_iter().next().unwrap().state
}
The function uses the retry! macro with the following parameters:
| Parameter | Value | Description |
|---|---|---|
| Max retries | 10 | Maximum number of retry attempts |
| Initial backoff | 100ms | Wait time before the first retry |
| Max backoff | 2 seconds | Maximum wait time between retries |
| Error handler | warn! logging |
Logs each failed attempt as a warning |
After successfully retrieving the pipeline jobs response, it extracts the state string from the first (most recent) job in the response. The function panics (via .unwrap()) if all retries are exhausted or if no jobs exist for the pipeline, which is appropriate since these conditions indicate a fundamental system failure.
wait_for_state
async fn wait_for_state(
client: &Client,
pipeline_id: &str,
expected_states: &[&str],
) -> anyhow::Result<()> {
let mut last_state: String = get_state(client, pipeline_id).await;
while !expected_states.contains(&last_state.as_str()) {
let state = get_state(client, pipeline_id).await;
if last_state != state {
info!("Job transitioned to {}", state);
last_state = state;
}
if last_state == "Failed" {
bail!("Job transitioned to failed");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
The polling loop operates as follows:
- Retrieves the initial state via
get_state. - Enters a loop that continues while the current state does not match any of the expected states.
- On each iteration, queries the current state.
- If the state has changed since the last observation, logs the transition with
info!and updates the tracking variable. - If the state is "Failed", immediately returns an error via
bail!, regardless of the expected states. - Sleeps for 100ms before the next poll.
Early Exit on Failure
The "Failed" state is treated as a universal terminal error state. Even if the caller is waiting for "Failed" as one of the expected states (as the shutdown handler does with ["Stopped", "Failed"]), the loop body checks for "Failed" after checking expected states in the loop condition. This means:
- If
expected_statescontains "Failed", the loop exits successfully on the condition check before reaching thebail!. - If
expected_statesdoes not contain "Failed", thebail!triggers an error return.
Usage Contexts
Pipeline Submission
wait_for_state(&client, &id, &["Running"]).await?;
After creating or resuming a pipeline, waits for the job to reach the "Running" state before reporting success to the user.
Graceful Shutdown
wait_for_state(&self.client, &pipeline_id, &["Stopped", "Failed"]).await
During shutdown, waits for the pipeline to reach either "Stopped" (clean checkpoint completed) or "Failed" (something went wrong during shutdown). This is wrapped in a 120-second timeout.
I/O
- Input: API client reference, pipeline ID string, and a slice of target state strings.
- Output:
get_statereturns the current state as aString.wait_for_statereturnsOk(())when the target state is reached, orErrwith "Job transitioned to failed" if the pipeline enters the "Failed" state while waiting for a non-failure target.