Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Pytorch Serve Cloud Stream Inference

From Leeroopedia

Overview

Cloud_Stream_Inference is an asynchronous streaming inference client that reads data from S3/MinIO cloud storage, extracts CSV batches from tar archives, and sends batched inference requests to a TorchServe endpoint. The module uses asyncio, aiohttp, and s3fs to achieve concurrent, non-blocking I/O for high-throughput inference workloads. It defines two key async functions: read_from_s3() for data extraction and send_inference_request() for dispatching predictions.

Field Value
Implementation Name Cloud_Stream_Inference
Type Client Utility
Workflow Streaming_Cloud_Inference
Domains Cloud_Infrastructure, Model_Serving
Knowledge Sources Pytorch_Serve
Last Updated 2026-02-13 18:52 GMT

Description

This module demonstrates how to build a high-throughput inference client that streams data directly from cloud object storage (S3 or MinIO) to TorchServe without downloading the entire dataset to disk first. It uses Python's asyncio event loop to overlap data reading and inference request submission, maximizing throughput.

Key Functions

Function Lines Description
read_from_s3() 25-79 Async function that connects to MinIO/S3 via s3fs, opens a tar archive, extracts CSV files, reads rows, and batches them for inference
send_inference_request() 82-93 Async function that sends a batch of data to the TorchServe prediction endpoint via aiohttp and returns the response

Constants

Constant Value Description
MINIO_URL MinIO endpoint URL S3-compatible object storage endpoint
MODEL_URL TorchServe prediction URL Target inference endpoint (e.g., http://localhost:8080/predictions/model_name)
BATCH_TOTAL_TASKS 1000 Maximum number of concurrent inference tasks to submit

Architecture

The streaming pipeline works as follows:

  1. Connect to MinIO/S3 using s3fs.S3FileSystem with endpoint configuration
  2. Open a tar archive from the S3 bucket
  3. Extract CSV files from within the tar archive
  4. Read CSV rows and accumulate them into batches
  5. For each batch, call send_inference_request() asynchronously
  6. Collect responses using asyncio.gather() with up to BATCH_TOTAL_TASKS concurrent requests

Dependencies

Dependency Purpose
asyncio Event loop and coroutine management
aiohttp Async HTTP client for inference requests
s3fs S3-compatible filesystem access (MinIO/AWS S3)
tarfile Extracting files from tar archives in S3
csv Parsing CSV data from extracted files

Code Reference

Source Location

File Lines Repository
examples/cloud_storage_stream_inference/stream_inference.py L1-97 pytorch/serve

Signature

import asyncio
import aiohttp
import s3fs
import tarfile
import csv

MINIO_URL = "http://localhost:9000"
MODEL_URL = "http://localhost:8080/predictions/my_model"
BATCH_TOTAL_TASKS = 1000


async def read_from_s3():
    """
    Read data from an S3/MinIO tar archive containing CSV files.

    Connects to the configured MinIO endpoint, opens a tar archive,
    extracts CSV files, and batches rows for inference. Dispatches
    batched inference requests asynchronously.

    Returns:
        list: Aggregated inference results from all batches.
    """
    ...


async def send_inference_request(session, data):
    """
    Send a single inference request to TorchServe.

    Posts the data payload to the configured MODEL_URL using the
    provided aiohttp session.

    Args:
        session (aiohttp.ClientSession): Reusable HTTP session.
        data: Preprocessed input data for the model.

    Returns:
        dict: JSON response from the TorchServe prediction endpoint.
    """
    ...

I/O Contract

read_from_s3()

Input Type Description
S3/MinIO tar archive Remote file Tar archive containing CSV files at the configured MINIO_URL
Output Type Description
Inference results list Aggregated prediction results from all batched requests

send_inference_request()

Parameter Type Description
session aiohttp.ClientSession Reusable async HTTP session
data varies Preprocessed input data for the model
Output Type Description
response dict JSON prediction response from TorchServe

Usage Examples

Example 1: Running the streaming inference client

import asyncio
from stream_inference import read_from_s3

# Run the async streaming pipeline
results = asyncio.run(read_from_s3())
print(f"Completed {len(results)} inference requests")

Example 2: Custom configuration

# Configure the constants before running:
# MINIO_URL - point to your S3-compatible endpoint
# MODEL_URL - point to your TorchServe prediction endpoint
# BATCH_TOTAL_TASKS - control concurrency level

# For AWS S3 instead of MinIO:
# MINIO_URL = "https://s3.amazonaws.com"
# Ensure AWS credentials are configured via environment or ~/.aws/credentials

Example 3: Async inference request pattern

import asyncio
import aiohttp

async def batch_inference_example():
    """Demonstrate batched async inference."""
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(BATCH_TOTAL_TASKS):
            task = send_inference_request(session, data_batch[i])
            tasks.append(task)

        # Execute all requests concurrently
        results = await asyncio.gather(*tasks)
        return results

# Run with: asyncio.run(batch_inference_example())

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment