Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Aws Credential Provider

From Leeroopedia


Knowledge Sources
Domains Streaming, Storage, AWS
Last Updated 2026-02-08 08:00 GMT

Overview

Implements ArroyoCredentialProvider, a caching and auto-refreshing AWS credential provider that bridges the AWS SDK credential chain to the object_store crate's CredentialProvider interface for S3 access.

Description

This module provides AWS credential management for Arroyo's S3-based checkpoint and state storage:

  • ArroyoCredentialProvider -- a singleton credential provider (initialized via OnceCell) that caches AWS credentials and proactively refreshes them before expiration. Implements object_store::CredentialProvider<Credential = AwsCredential>.
  • Credential caching -- stores the current (AwsCredential, expiration, last_refreshed) tuple in an Arc<Mutex<...>>. The cache is checked on every get_credential call.
  • Refresh strategy:
    • If the token has expired (< 100ms remaining), performs an immediate synchronous refresh before returning.
    • If the token is near expiration (< EXPIRATION_BUFFER of 5 minutes) and was not recently refreshed (> 100ms since last attempt), spawns a background refresh task to avoid blocking the caller.
    • If a background refresh is already in progress, returns the current (still-valid) cached token.
  • AWS SDK integration -- uses aws_config::defaults with a timeout configuration (60s operation timeout, 5s per-attempt timeout) to load credentials from the standard AWS credential chain.
  • default_region -- exposes the AWS region from the SDK config for S3 bucket operations.

Usage

Used internally by the Arroyo storage layer when constructing S3 object store clients. The singleton pattern ensures only one credential refresh cycle runs across the entire process.

Code Reference

Source Location

Signature

#[derive(Clone)]
pub struct ArroyoCredentialProvider {
    cache: Arc<Mutex<TemporaryToken>>,
    provider: SharedCredentialsProvider,
    refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
}

impl ArroyoCredentialProvider {
    pub async fn try_new() -> Result<Self, StorageError>;
    pub async fn default_region() -> Option<String>;
}

#[async_trait]
impl CredentialProvider for ArroyoCredentialProvider {
    type Credential = AwsCredential;
    async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>>;
}

Import

use arroyo_storage::aws::ArroyoCredentialProvider;

I/O Contract

Inputs

Name Type Required Description
AWS environment env vars / config Yes Standard AWS credential chain (env vars, config files, IAM roles, etc.)

Outputs

Name Type Description
Arc<AwsCredential> Arc<AwsCredential> Cached AWS credential with key_id, secret_key, and optional session token
Option<String> Option<String> Default AWS region from the SDK configuration

Usage Examples

use arroyo_storage::aws::ArroyoCredentialProvider;

// Initialize the credential provider (singleton)
let provider = ArroyoCredentialProvider::try_new().await?;

// Get current credentials (may trigger refresh)
let creds = provider.get_credential().await?;
println!("Using key: {}", creds.key_id);

// Get the configured AWS region
let region = ArroyoCredentialProvider::default_region().await;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment