Overview
Concrete tool for processing Kafka S3 object creation events to download, preprocess, classify, and re-upload MNIST digit images provided by the KServe sample code.
Description
This module implements an event-driven image classification pipeline with two main components:
- image_transform() -- A standalone function that reads a grayscale image with OpenCV, inverts and resizes it to 28x28 pixels, flattens it, and normalizes pixel values to [0, 1].
- ImageTransformer -- A KServe
Model subclass that acts as a transformer in the KServe inference pipeline. Its async preprocess() method extracts the S3 bucket and key from a Kafka CloudEvent (s3:ObjectCreated:Put), downloads the image from MinIO using boto3, and applies image_transform() to prepare the input. Its async postprocess() method reads the predicted digit class from the model response and uploads the original image to a digit-specific folder in the "digits" S3 bucket.
The module also initializes a global boto3 S3 client configured for MinIO at module level.
Usage
Use this class when building an event-driven KServe inference pipeline where Kafka events trigger image downloads from S3/MinIO, preprocessing for MNIST-style classification, and result-based file organization.
Code Reference
Source Location
Signature
def image_transform(image):
...
class ImageTransformer(kserve.Model):
def __init__(self, name: str, predictor_host: str):
...
async def preprocess(
self, inputs: Union[Dict, InferRequest], headers: Dict[str, str] = None
) -> Union[Dict, InferRequest]:
...
async def postprocess(
self,
response: Union[Dict, InferResponse, ModelInferResponse],
headers: Dict[str, str] = None,
) -> Union[Dict, ModelInferResponse]:
...
Import
from image_transformer import ImageTransformer, image_transform
I/O Contract
Inputs
image_transform()
| Name |
Type |
Required |
Description
|
| image |
str |
Yes |
File path to the image to be preprocessed
|
ImageTransformer Constructor
| Name |
Type |
Required |
Description
|
| name |
str |
Yes |
Model name for KServe routing
|
| predictor_host |
str |
Yes |
Hostname of the predictor service
|
preprocess()
| Name |
Type |
Required |
Description
|
| inputs |
Union[Dict, InferRequest] |
Yes |
Kafka CloudEvent payload with S3 object creation data containing bucket name and object key
|
| headers |
Dict[str, str] |
No |
Optional HTTP headers
|
postprocess()
| Name |
Type |
Required |
Description
|
| response |
Union[Dict, InferResponse, ModelInferResponse] |
Yes |
Model prediction response containing predicted digit class
|
| headers |
Dict[str, str] |
No |
Optional HTTP headers
|
Outputs
image_transform()
| Name |
Type |
Description
|
| pixels |
list |
Flattened list of 784 normalized float pixel values (28x28 grayscale)
|
preprocess()
| Name |
Type |
Description
|
| request |
Dict |
Dictionary with "instances" key containing the preprocessed image array
|
postprocess()
| Name |
Type |
Description
|
| response |
Union[Dict, ModelInferResponse] |
The original model response (pass-through after S3 upload side effect)
|
Usage Examples
Basic Usage
from image_transformer import ImageTransformer
import kserve
# Create the transformer
transformer = ImageTransformer(
name="mnist-transformer",
predictor_host="mnist-predictor.default.svc.cluster.local",
)
# The transformer is typically started as part of a KServe model server
kserve.ModelServer().start([transformer])
# Kafka events are handled automatically:
# 1. S3 ObjectCreated event arrives via Kafka
# 2. preprocess() downloads and transforms the image
# 3. Prediction is sent to the predictor service
# 4. postprocess() uploads the image to a digit-classified folder
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.