Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Wait For State

From Leeroopedia
Revision as of 14:29, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_Wait_For_State.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Template:Implementation

Summary

The Wait For State implementation realizes the Pipeline State Monitoring principle through two cooperating functions: get_state for retrieving the current job state from the API, and wait_for_state for polling until a target state is reached. Together, these functions provide the CLI with the ability to synchronously wait for asynchronous pipeline lifecycle transitions.

Code Reference

  • File: crates/arroyo/src/run.rs (L30-L65)
  • Functions: get_state, wait_for_state

Function Signatures

get_state

async fn get_state(client: &Client, pipeline_id: &str) -> String

Retrieves the current state of a pipeline's most recent job by calling the get_pipeline_jobs API endpoint.

wait_for_state

async fn wait_for_state(
    client: &Client,
    pipeline_id: &str,
    expected_states: &[&str],
) -> anyhow::Result<()>

Polls get_state at 100ms intervals until the pipeline's job state matches one of the expected states, or transitions to "Failed".

Parameters

Parameter Type Description
client &Client Reference to the OpenAPI client for API calls
pipeline_id &str The pipeline's unique identifier
expected_states &[&str] Slice of target state strings to wait for (e.g., ["Running"] or ["Stopped", "Failed"])

Implementation Detail

get_state

async fn get_state(client: &Client, pipeline_id: &str) -> String {
    let jobs = retry!(
        client.get_pipeline_jobs().id(pipeline_id).send().await,
        10,
        Duration::from_millis(100),
        Duration::from_secs(2),
        |e| { warn!("Failed to get job state from API: {}", e) }
    )
    .unwrap()
    .into_inner();

    jobs.data.into_iter().next().unwrap().state
}

The function uses the retry! macro with the following parameters:

Parameter Value Description
Max retries 10 Maximum number of retry attempts
Initial backoff 100ms Wait time before the first retry
Max backoff 2 seconds Maximum wait time between retries
Error handler warn! logging Logs each failed attempt as a warning

After successfully retrieving the pipeline jobs response, it extracts the state string from the first (most recent) job in the response. The function panics (via .unwrap()) if all retries are exhausted or if no jobs exist for the pipeline, which is appropriate since these conditions indicate a fundamental system failure.

wait_for_state

async fn wait_for_state(
    client: &Client,
    pipeline_id: &str,
    expected_states: &[&str],
) -> anyhow::Result<()> {
    let mut last_state: String = get_state(client, pipeline_id).await;
    while !expected_states.contains(&last_state.as_str()) {
        let state = get_state(client, pipeline_id).await;
        if last_state != state {
            info!("Job transitioned to {}", state);
            last_state = state;
        }

        if last_state == "Failed" {
            bail!("Job transitioned to failed");
        }

        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    Ok(())
}

The polling loop operates as follows:

  1. Retrieves the initial state via get_state.
  2. Enters a loop that continues while the current state does not match any of the expected states.
  3. On each iteration, queries the current state.
  4. If the state has changed since the last observation, logs the transition with info! and updates the tracking variable.
  5. If the state is "Failed", immediately returns an error via bail!, regardless of the expected states.
  6. Sleeps for 100ms before the next poll.

Early Exit on Failure

The "Failed" state is treated as a universal terminal error state. Even if the caller is waiting for "Failed" as one of the expected states (as the shutdown handler does with ["Stopped", "Failed"]), the loop body checks for "Failed" after checking expected states in the loop condition. This means:

  • If expected_states contains "Failed", the loop exits successfully on the condition check before reaching the bail!.
  • If expected_states does not contain "Failed", the bail! triggers an error return.

Usage Contexts

Pipeline Submission

wait_for_state(&client, &id, &["Running"]).await?;

After creating or resuming a pipeline, waits for the job to reach the "Running" state before reporting success to the user.

Graceful Shutdown

wait_for_state(&self.client, &pipeline_id, &["Stopped", "Failed"]).await

During shutdown, waits for the pipeline to reach either "Stopped" (clean checkpoint completed) or "Failed" (something went wrong during shutdown). This is wrapped in a 120-second timeout.

I/O

  • Input: API client reference, pipeline ID string, and a slice of target state strings.
  • Output: get_state returns the current state as a String. wait_for_state returns Ok(()) when the target state is reached, or Err with "Job transitioned to failed" if the pipeline enters the "Failed" state while waiting for a non-failure target.

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment