Overview
Cloud_Stream_Inference is an asynchronous streaming inference client that reads data from S3/MinIO cloud storage, extracts CSV batches from tar archives, and sends batched inference requests to a TorchServe endpoint. The module uses asyncio, aiohttp, and s3fs to achieve concurrent, non-blocking I/O for high-throughput inference workloads. It defines two key async functions: read_from_s3() for data extraction and send_inference_request() for dispatching predictions.
Description
This module demonstrates how to build a high-throughput inference client that streams data directly from cloud object storage (S3 or MinIO) to TorchServe without downloading the entire dataset to disk first. It uses Python's asyncio event loop to overlap data reading and inference request submission, maximizing throughput.
Key Functions
| Function |
Lines |
Description
|
read_from_s3() |
25-79 |
Async function that connects to MinIO/S3 via s3fs, opens a tar archive, extracts CSV files, reads rows, and batches them for inference
|
send_inference_request() |
82-93 |
Async function that sends a batch of data to the TorchServe prediction endpoint via aiohttp and returns the response
|
Constants
| Constant |
Value |
Description
|
MINIO_URL |
MinIO endpoint URL |
S3-compatible object storage endpoint
|
MODEL_URL |
TorchServe prediction URL |
Target inference endpoint (e.g., http://localhost:8080/predictions/model_name)
|
BATCH_TOTAL_TASKS |
1000 |
Maximum number of concurrent inference tasks to submit
|
Architecture
The streaming pipeline works as follows:
- Connect to MinIO/S3 using
s3fs.S3FileSystem with endpoint configuration
- Open a tar archive from the S3 bucket
- Extract CSV files from within the tar archive
- Read CSV rows and accumulate them into batches
- For each batch, call
send_inference_request() asynchronously
- Collect responses using
asyncio.gather() with up to BATCH_TOTAL_TASKS concurrent requests
Dependencies
| Dependency |
Purpose
|
asyncio |
Event loop and coroutine management
|
aiohttp |
Async HTTP client for inference requests
|
s3fs |
S3-compatible filesystem access (MinIO/AWS S3)
|
tarfile |
Extracting files from tar archives in S3
|
csv |
Parsing CSV data from extracted files
|
Code Reference
Source Location
| File |
Lines |
Repository
|
examples/cloud_storage_stream_inference/stream_inference.py |
L1-97 |
pytorch/serve
|
Signature
import asyncio
import aiohttp
import s3fs
import tarfile
import csv
MINIO_URL = "http://localhost:9000"
MODEL_URL = "http://localhost:8080/predictions/my_model"
BATCH_TOTAL_TASKS = 1000
async def read_from_s3():
"""
Read data from an S3/MinIO tar archive containing CSV files.
Connects to the configured MinIO endpoint, opens a tar archive,
extracts CSV files, and batches rows for inference. Dispatches
batched inference requests asynchronously.
Returns:
list: Aggregated inference results from all batches.
"""
...
async def send_inference_request(session, data):
"""
Send a single inference request to TorchServe.
Posts the data payload to the configured MODEL_URL using the
provided aiohttp session.
Args:
session (aiohttp.ClientSession): Reusable HTTP session.
data: Preprocessed input data for the model.
Returns:
dict: JSON response from the TorchServe prediction endpoint.
"""
...
I/O Contract
read_from_s3()
| Input |
Type |
Description
|
| S3/MinIO tar archive |
Remote file |
Tar archive containing CSV files at the configured MINIO_URL
|
| Output |
Type |
Description
|
| Inference results |
list |
Aggregated prediction results from all batched requests
|
send_inference_request()
| Parameter |
Type |
Description
|
session |
aiohttp.ClientSession |
Reusable async HTTP session
|
data |
varies |
Preprocessed input data for the model
|
| Output |
Type |
Description
|
| response |
dict |
JSON prediction response from TorchServe
|
Usage Examples
Example 1: Running the streaming inference client
import asyncio
from stream_inference import read_from_s3
# Run the async streaming pipeline
results = asyncio.run(read_from_s3())
print(f"Completed {len(results)} inference requests")
Example 2: Custom configuration
# Configure the constants before running:
# MINIO_URL - point to your S3-compatible endpoint
# MODEL_URL - point to your TorchServe prediction endpoint
# BATCH_TOTAL_TASKS - control concurrency level
# For AWS S3 instead of MinIO:
# MINIO_URL = "https://s3.amazonaws.com"
# Ensure AWS credentials are configured via environment or ~/.aws/credentials
Example 3: Async inference request pattern
import asyncio
import aiohttp
async def batch_inference_example():
"""Demonstrate batched async inference."""
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(BATCH_TOTAL_TASKS):
task = send_inference_request(session, data_batch[i])
tasks.append(task)
# Execute all requests concurrently
results = await asyncio.gather(*tasks)
return results
# Run with: asyncio.run(batch_inference_example())
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.