Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Haifengl Smile Streaming Prediction API

From Leeroopedia


Overview

The Streaming Prediction API is a reactive REST endpoint (POST /api/v1/models/{id}/stream) that accepts line-delimited input (JSON or CSV), processes each line as an individual prediction, and streams results back to the client one line at a time using Mutiny's Multi reactive type. This avoids buffering entire batches in memory and provides results incrementally as they are computed.

This is a Wrapper Doc -- it documents how Smile wraps the SmallRye Mutiny reactive streams library and RESTEasy Reactive to implement streaming batch inference.

Source Location

Class Location
InferenceResource.csv() (streaming endpoint) serve/src/main/java/smile/serve/InferenceResource.java (Lines 72-95)
InferenceModel.predict(String) (CSV prediction) serve/src/main/java/smile/serve/InferenceModel.java (Lines 93-95)
InferenceResponse.toString() (text output) serve/src/main/java/smile/serve/InferenceResponse.java (Lines 53-59)

Import Statements

N/A -- this is a REST endpoint consumed via HTTP.

For server-side development:

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import org.jboss.resteasy.reactive.RestStreamElementType;

External Dependencies

Dependency Class/Annotation Purpose
SmallRye Mutiny io.smallrye.mutiny.Multi Reactive stream type representing 0..N asynchronous items
SmallRye Mutiny Infrastructure.getDefaultWorkerPool() Thread pool for offloading blocking work from the event loop
RESTEasy Reactive @RestStreamElementType(MediaType.TEXT_PLAIN) Instructs RESTEasy to stream each Multi element individually instead of buffering
JAX-RS @POST, @Path, @Consumes, @Produces REST endpoint annotations
JAX-RS @HeaderParam("Content-Type") Reads the Content-Type header to determine input format
Java I/O java.io.InputStream, java.io.BufferedReader Streaming input consumption

Type

Wrapper Doc (wraps SmallRye Mutiny reactive streams and RESTEasy Reactive streaming)

REST Endpoint

POST /api/v1/models/{id}/stream

Property Value
Method POST
Path /api/v1/models/{id}/stream
Consumes application/json or text/plain (CSV)
Produces text/plain (streamed, one result per line)
Path Parameter id -- model identifier
Header Content-Type -- determines input parsing (JSON vs. CSV)
Request Body Line-delimited input records (streamed)
Response Line-delimited prediction results (streamed)
Errors 404 Not Found if model ID is invalid; stream terminates on parse/prediction errors

How Smile Uses Mutiny

Multi: The Reactive Stream Type

Mutiny's Multi<T> represents a stream of 0 to N items emitted asynchronously. In this implementation, each emitted item is a String representing one prediction result. When used as a JAX-RS return type with @RestStreamElementType, Quarkus/RESTEasy automatically:

  1. Sets the response to chunked transfer encoding.
  2. Writes each emitted String to the HTTP response as it becomes available.
  3. Closes the response when the Multi completes or fails.

Emitter Pattern

The implementation uses Multi.createFrom().emitter() to bridge between imperative (blocking) and reactive (non-blocking) code:

@POST
@Path("/{id}/stream")
@Consumes({MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN})
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> csv(@HeaderParam("Content-Type") String contentType,
                         @PathParam("id") String id,
                         InputStream input) {
    var model = service.getModel(id);
    boolean json = MediaType.APPLICATION_JSON.equals(contentType);
    return Multi.createFrom().emitter(emitter -> {
        Infrastructure.getDefaultWorkerPool().submit(() -> {
            try (var reader = new BufferedReader(new InputStreamReader(input))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    if (!line.isBlank()) {
                        var response = json
                            ? model.predict(new JsonObject(line))
                            : model.predict(line);
                        emitter.emit(response.toString());
                    }
                }
                emitter.complete();
            } catch (Exception ex) {
                emitter.fail(ex);
            }
        });
    });
}

Key Mutiny/Quarkus Patterns Used

Pattern Code Purpose
Emitter creation Multi.createFrom().emitter(emitter -> ...) Creates a Multi from an imperative emission callback
Worker pool offloading Infrastructure.getDefaultWorkerPool().submit(() -> ...) Moves the blocking I/O loop off the Vert.x event loop thread
Item emission emitter.emit(response.toString()) Pushes one prediction result into the stream
Stream completion emitter.complete() Signals end-of-stream after all lines are processed
Error propagation emitter.fail(ex) Terminates the stream with an error
Element-level streaming @RestStreamElementType(MediaType.TEXT_PLAIN) Tells RESTEasy to flush each element individually (no buffering)

Why Worker Pool Offloading?

Quarkus uses the Vert.x event loop for I/O. The event loop thread must never be blocked. The prediction loop reads from an InputStream (blocking I/O) and executes model predictions (CPU-intensive). Both would block the event loop. By submitting the entire loop to Infrastructure.getDefaultWorkerPool(), the work runs on a separate thread while the event loop remains free to handle other connections.

Content-Type Dispatching

The Content-Type header determines how each line is parsed:

boolean json = MediaType.APPLICATION_JSON.equals(contentType);

// Inside the loop:
var response = json
    ? model.predict(new JsonObject(line))  // JSON: parse as JsonObject
    : model.predict(line);                 // CSV: parse as comma-separated string
Content-Type Parsing Example Input Line
application/json new JsonObject(line) -> InferenceModel.predict(JsonObject) {"sepal_length":5.1,"sepal_width":3.5,"petal_length":1.4,"petal_width":0.2}
text/plain (or other) InferenceModel.predict(String) -> CSV split 5.1,3.5,1.4,0.2

Output Format

Each prediction is emitted as a text line via InferenceResponse.toString():

@Override
public String toString() {
    String s = prediction.toString();
    if (probabilities != null) {
        s += Arrays.stream(probabilities)
            .mapToObj(p -> String.format("%.3f", p))
            .collect(Collectors.joining(" ", " ", ""));
    }
    return s;
}

Classification output: 1 0.100 0.900 (prediction followed by space-separated probabilities)

Regression output: 245000.0 (prediction only)

HTTP Examples

Streaming CSV Batch Prediction

curl -X POST http://localhost:8080/api/v1/models/iris-classifier-2/stream \
  -H "Content-Type: text/plain" \
  --data-binary @- << 'EOF'
5.1,3.5,1.4,0.2
7.0,3.2,4.7,1.4
6.3,3.3,6.0,2.5
EOF

Streamed response (one line per prediction):

0 0.970 0.020 0.010
1 0.050 0.920 0.030
2 0.010 0.040 0.950

Streaming JSON Batch Prediction

curl -X POST http://localhost:8080/api/v1/models/iris-classifier-2/stream \
  -H "Content-Type: application/json" \
  --data-binary @- << 'EOF'
{"sepal_length":5.1,"sepal_width":3.5,"petal_length":1.4,"petal_width":0.2}
{"sepal_length":7.0,"sepal_width":3.2,"petal_length":4.7,"petal_width":1.4}
{"sepal_length":6.3,"sepal_width":3.3,"petal_length":6.0,"petal_width":2.5}
EOF

Streamed response:

0 0.970 0.020 0.010
1 0.050 0.920 0.030
2 0.010 0.040 0.950

Piping from a File

# Score a large CSV file and save results
curl -X POST http://localhost:8080/api/v1/models/iris-classifier-2/stream \
  -H "Content-Type: text/plain" \
  --data-binary @input.csv > predictions.txt

Error Behavior

If any line fails to parse or predict, the stream terminates with an error:

} catch (Exception ex) {
    emitter.fail(ex);  // terminates the Multi stream
}

Lines that have already been emitted are received by the client. The HTTP connection is closed after the error. Clients should check for premature stream termination (fewer result lines than input lines) to detect errors.

Quarkus Configuration

The streaming endpoint works with the default Quarkus configuration. Relevant settings that can be tuned in application.properties:

# REST path prefix (affects all endpoints)
quarkus.rest.path=/api/v1

# Worker pool size (affects throughput for concurrent streaming requests)
quarkus.vertx.worker-pool-size=20

# HTTP port
quarkus.http.port=8080
%dev.quarkus.http.port=8888

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment