Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Kserve Kserve Kafka ImageTransformer

From Leeroopedia
Knowledge Sources
Domains Event-Driven Inference, Image Processing
Last Updated 2026-02-13 00:00 GMT

Overview

Concrete tool for processing Kafka S3 object creation events to download, preprocess, classify, and re-upload MNIST digit images provided by the KServe sample code.

Description

This module implements an event-driven image classification pipeline with two main components:

  • image_transform() -- A standalone function that reads a grayscale image with OpenCV, inverts and resizes it to 28x28 pixels, flattens it, and normalizes pixel values to [0, 1].
  • ImageTransformer -- A KServe Model subclass that acts as a transformer in the KServe inference pipeline. Its async preprocess() method extracts the S3 bucket and key from a Kafka CloudEvent (s3:ObjectCreated:Put), downloads the image from MinIO using boto3, and applies image_transform() to prepare the input. Its async postprocess() method reads the predicted digit class from the model response and uploads the original image to a digit-specific folder in the "digits" S3 bucket.

The module also initializes a global boto3 S3 client configured for MinIO at module level.

Usage

Use this class when building an event-driven KServe inference pipeline where Kafka events trigger image downloads from S3/MinIO, preprocessing for MNIST-style classification, and result-based file organization.

Code Reference

Source Location

Signature

def image_transform(image):
    ...


class ImageTransformer(kserve.Model):
    def __init__(self, name: str, predictor_host: str):
        ...

    async def preprocess(
        self, inputs: Union[Dict, InferRequest], headers: Dict[str, str] = None
    ) -> Union[Dict, InferRequest]:
        ...

    async def postprocess(
        self,
        response: Union[Dict, InferResponse, ModelInferResponse],
        headers: Dict[str, str] = None,
    ) -> Union[Dict, ModelInferResponse]:
        ...

Import

from image_transformer import ImageTransformer, image_transform

I/O Contract

Inputs

image_transform()

Name Type Required Description
image str Yes File path to the image to be preprocessed

ImageTransformer Constructor

Name Type Required Description
name str Yes Model name for KServe routing
predictor_host str Yes Hostname of the predictor service

preprocess()

Name Type Required Description
inputs Union[Dict, InferRequest] Yes Kafka CloudEvent payload with S3 object creation data containing bucket name and object key
headers Dict[str, str] No Optional HTTP headers

postprocess()

Name Type Required Description
response Union[Dict, InferResponse, ModelInferResponse] Yes Model prediction response containing predicted digit class
headers Dict[str, str] No Optional HTTP headers

Outputs

image_transform()

Name Type Description
pixels list Flattened list of 784 normalized float pixel values (28x28 grayscale)

preprocess()

Name Type Description
request Dict Dictionary with "instances" key containing the preprocessed image array

postprocess()

Name Type Description
response Union[Dict, ModelInferResponse] The original model response (pass-through after S3 upload side effect)

Usage Examples

Basic Usage

from image_transformer import ImageTransformer
import kserve

# Create the transformer
transformer = ImageTransformer(
    name="mnist-transformer",
    predictor_host="mnist-predictor.default.svc.cluster.local",
)

# The transformer is typically started as part of a KServe model server
kserve.ModelServer().start([transformer])

# Kafka events are handled automatically:
# 1. S3 ObjectCreated event arrives via Kafka
# 2. preprocess() downloads and transforms the image
# 3. Prediction is sent to the predictor service
# 4. postprocess() uploads the image to a digit-classified folder

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment