Implementation:ArroyoSystems Arroyo Aws Credential Provider
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Storage, AWS |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements ArroyoCredentialProvider, a caching and auto-refreshing AWS credential provider that bridges the AWS SDK credential chain to the object_store crate's CredentialProvider interface for S3 access.
Description
This module provides AWS credential management for Arroyo's S3-based checkpoint and state storage:
- ArroyoCredentialProvider -- a singleton credential provider (initialized via OnceCell) that caches AWS credentials and proactively refreshes them before expiration. Implements object_store::CredentialProvider<Credential = AwsCredential>.
- Credential caching -- stores the current (AwsCredential, expiration, last_refreshed) tuple in an Arc<Mutex<...>>. The cache is checked on every get_credential call.
- Refresh strategy:
- If the token has expired (< 100ms remaining), performs an immediate synchronous refresh before returning.
- If the token is near expiration (< EXPIRATION_BUFFER of 5 minutes) and was not recently refreshed (> 100ms since last attempt), spawns a background refresh task to avoid blocking the caller.
- If a background refresh is already in progress, returns the current (still-valid) cached token.
- AWS SDK integration -- uses aws_config::defaults with a timeout configuration (60s operation timeout, 5s per-attempt timeout) to load credentials from the standard AWS credential chain.
- default_region -- exposes the AWS region from the SDK config for S3 bucket operations.
Usage
Used internally by the Arroyo storage layer when constructing S3 object store clients. The singleton pattern ensures only one credential refresh cycle runs across the entire process.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-storage/src/aws.rs
Signature
#[derive(Clone)]
pub struct ArroyoCredentialProvider {
cache: Arc<Mutex<TemporaryToken>>,
provider: SharedCredentialsProvider,
refresh_task: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl ArroyoCredentialProvider {
pub async fn try_new() -> Result<Self, StorageError>;
pub async fn default_region() -> Option<String>;
}
#[async_trait]
impl CredentialProvider for ArroyoCredentialProvider {
type Credential = AwsCredential;
async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>>;
}
Import
use arroyo_storage::aws::ArroyoCredentialProvider;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| AWS environment | env vars / config | Yes | Standard AWS credential chain (env vars, config files, IAM roles, etc.) |
Outputs
| Name | Type | Description |
|---|---|---|
| Arc<AwsCredential> | Arc<AwsCredential> | Cached AWS credential with key_id, secret_key, and optional session token |
| Option<String> | Option<String> | Default AWS region from the SDK configuration |
Usage Examples
use arroyo_storage::aws::ArroyoCredentialProvider;
// Initialize the credential provider (singleton)
let provider = ArroyoCredentialProvider::try_new().await?;
// Get current credentials (may trigger refresh)
let creds = provider.get_credential().await?;
println!("Using key: {}", creds.key_id);
// Get the configured AWS region
let region = ArroyoCredentialProvider::default_region().await;
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment