Implementation:Haifengl Smile Streaming Prediction API
Overview
The Streaming Prediction API is a reactive REST endpoint (POST /api/v1/models/{id}/stream) that accepts line-delimited input (JSON or CSV), processes each line as an individual prediction, and streams results back to the client one line at a time using Mutiny's Multi reactive type. This avoids buffering entire batches in memory and provides results incrementally as they are computed.
This is a Wrapper Doc -- it documents how Smile wraps the SmallRye Mutiny reactive streams library and RESTEasy Reactive to implement streaming batch inference.
Source Location
| Class | Location |
|---|---|
InferenceResource.csv() (streaming endpoint) |
serve/src/main/java/smile/serve/InferenceResource.java (Lines 72-95) |
InferenceModel.predict(String) (CSV prediction) |
serve/src/main/java/smile/serve/InferenceModel.java (Lines 93-95) |
InferenceResponse.toString() (text output) |
serve/src/main/java/smile/serve/InferenceResponse.java (Lines 53-59) |
Import Statements
N/A -- this is a REST endpoint consumed via HTTP.
For server-side development:
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import org.jboss.resteasy.reactive.RestStreamElementType;
External Dependencies
| Dependency | Class/Annotation | Purpose |
|---|---|---|
| SmallRye Mutiny | io.smallrye.mutiny.Multi |
Reactive stream type representing 0..N asynchronous items |
| SmallRye Mutiny | Infrastructure.getDefaultWorkerPool() |
Thread pool for offloading blocking work from the event loop |
| RESTEasy Reactive | @RestStreamElementType(MediaType.TEXT_PLAIN) |
Instructs RESTEasy to stream each Multi element individually instead of buffering
|
| JAX-RS | @POST, @Path, @Consumes, @Produces |
REST endpoint annotations |
| JAX-RS | @HeaderParam("Content-Type") |
Reads the Content-Type header to determine input format |
| Java I/O | java.io.InputStream, java.io.BufferedReader |
Streaming input consumption |
Type
Wrapper Doc (wraps SmallRye Mutiny reactive streams and RESTEasy Reactive streaming)
REST Endpoint
POST /api/v1/models/{id}/stream
| Property | Value |
|---|---|
| Method | POST
|
| Path | /api/v1/models/{id}/stream
|
| Consumes | application/json or text/plain (CSV)
|
| Produces | text/plain (streamed, one result per line)
|
| Path Parameter | id -- model identifier
|
| Header | Content-Type -- determines input parsing (JSON vs. CSV)
|
| Request Body | Line-delimited input records (streamed) |
| Response | Line-delimited prediction results (streamed) |
| Errors | 404 Not Found if model ID is invalid; stream terminates on parse/prediction errors
|
How Smile Uses Mutiny
Multi: The Reactive Stream Type
Mutiny's Multi<T> represents a stream of 0 to N items emitted asynchronously. In this implementation, each emitted item is a String representing one prediction result. When used as a JAX-RS return type with @RestStreamElementType, Quarkus/RESTEasy automatically:
- Sets the response to chunked transfer encoding.
- Writes each emitted
Stringto the HTTP response as it becomes available. - Closes the response when the
Multicompletes or fails.
Emitter Pattern
The implementation uses Multi.createFrom().emitter() to bridge between imperative (blocking) and reactive (non-blocking) code:
@POST
@Path("/{id}/stream")
@Consumes({MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN})
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> csv(@HeaderParam("Content-Type") String contentType,
@PathParam("id") String id,
InputStream input) {
var model = service.getModel(id);
boolean json = MediaType.APPLICATION_JSON.equals(contentType);
return Multi.createFrom().emitter(emitter -> {
Infrastructure.getDefaultWorkerPool().submit(() -> {
try (var reader = new BufferedReader(new InputStreamReader(input))) {
String line;
while ((line = reader.readLine()) != null) {
if (!line.isBlank()) {
var response = json
? model.predict(new JsonObject(line))
: model.predict(line);
emitter.emit(response.toString());
}
}
emitter.complete();
} catch (Exception ex) {
emitter.fail(ex);
}
});
});
}
Key Mutiny/Quarkus Patterns Used
| Pattern | Code | Purpose |
|---|---|---|
| Emitter creation | Multi.createFrom().emitter(emitter -> ...) |
Creates a Multi from an imperative emission callback
|
| Worker pool offloading | Infrastructure.getDefaultWorkerPool().submit(() -> ...) |
Moves the blocking I/O loop off the Vert.x event loop thread |
| Item emission | emitter.emit(response.toString()) |
Pushes one prediction result into the stream |
| Stream completion | emitter.complete() |
Signals end-of-stream after all lines are processed |
| Error propagation | emitter.fail(ex) |
Terminates the stream with an error |
| Element-level streaming | @RestStreamElementType(MediaType.TEXT_PLAIN) |
Tells RESTEasy to flush each element individually (no buffering) |
Why Worker Pool Offloading?
Quarkus uses the Vert.x event loop for I/O. The event loop thread must never be blocked. The prediction loop reads from an InputStream (blocking I/O) and executes model predictions (CPU-intensive). Both would block the event loop. By submitting the entire loop to Infrastructure.getDefaultWorkerPool(), the work runs on a separate thread while the event loop remains free to handle other connections.
Content-Type Dispatching
The Content-Type header determines how each line is parsed:
boolean json = MediaType.APPLICATION_JSON.equals(contentType);
// Inside the loop:
var response = json
? model.predict(new JsonObject(line)) // JSON: parse as JsonObject
: model.predict(line); // CSV: parse as comma-separated string
| Content-Type | Parsing | Example Input Line |
|---|---|---|
application/json |
new JsonObject(line) -> InferenceModel.predict(JsonObject) |
{"sepal_length":5.1,"sepal_width":3.5,"petal_length":1.4,"petal_width":0.2}
|
text/plain (or other) |
InferenceModel.predict(String) -> CSV split |
5.1,3.5,1.4,0.2
|
Output Format
Each prediction is emitted as a text line via InferenceResponse.toString():
@Override
public String toString() {
String s = prediction.toString();
if (probabilities != null) {
s += Arrays.stream(probabilities)
.mapToObj(p -> String.format("%.3f", p))
.collect(Collectors.joining(" ", " ", ""));
}
return s;
}
Classification output: 1 0.100 0.900 (prediction followed by space-separated probabilities)
Regression output: 245000.0 (prediction only)
HTTP Examples
Streaming CSV Batch Prediction
curl -X POST http://localhost:8080/api/v1/models/iris-classifier-2/stream \
-H "Content-Type: text/plain" \
--data-binary @- << 'EOF'
5.1,3.5,1.4,0.2
7.0,3.2,4.7,1.4
6.3,3.3,6.0,2.5
EOF
Streamed response (one line per prediction):
0 0.970 0.020 0.010
1 0.050 0.920 0.030
2 0.010 0.040 0.950
Streaming JSON Batch Prediction
curl -X POST http://localhost:8080/api/v1/models/iris-classifier-2/stream \
-H "Content-Type: application/json" \
--data-binary @- << 'EOF'
{"sepal_length":5.1,"sepal_width":3.5,"petal_length":1.4,"petal_width":0.2}
{"sepal_length":7.0,"sepal_width":3.2,"petal_length":4.7,"petal_width":1.4}
{"sepal_length":6.3,"sepal_width":3.3,"petal_length":6.0,"petal_width":2.5}
EOF
Streamed response:
0 0.970 0.020 0.010
1 0.050 0.920 0.030
2 0.010 0.040 0.950
Piping from a File
# Score a large CSV file and save results
curl -X POST http://localhost:8080/api/v1/models/iris-classifier-2/stream \
-H "Content-Type: text/plain" \
--data-binary @input.csv > predictions.txt
Error Behavior
If any line fails to parse or predict, the stream terminates with an error:
} catch (Exception ex) {
emitter.fail(ex); // terminates the Multi stream
}
Lines that have already been emitted are received by the client. The HTTP connection is closed after the error. Clients should check for premature stream termination (fewer result lines than input lines) to detect errors.
Quarkus Configuration
The streaming endpoint works with the default Quarkus configuration. Relevant settings that can be tuned in application.properties:
# REST path prefix (affects all endpoints)
quarkus.rest.path=/api/v1
# Worker pool size (affects throughput for concurrent streaming requests)
quarkus.vertx.worker-pool-size=20
# HTTP port
quarkus.http.port=8080
%dev.quarkus.http.port=8888