Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Fede1024 Rust rdkafka Async Stream Processing

From Leeroopedia
Revision as of 11:03, 16 February 2026 by Admin (talk | contribs) (Auto-imported from workflows/Fede1024_Rust_rdkafka_Async_Stream_Processing.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Messaging, Kafka, Stream_Processing, Async_Rust
Last Updated 2026-02-07 19:30 GMT

Overview

End-to-end process for building a scalable async stream processing pipeline that consumes messages, offloads CPU-intensive work to a blocking thread pool, and produces transformed results to an output topic.

Description

This workflow demonstrates a high-throughput Kafka processing pattern using the StreamConsumer stream API combined with Tokio's task model. Messages are consumed as an async stream, detached from the consumer's borrow scope (converting BorrowedMessage to OwnedMessage), dispatched to a blocking thread pool for CPU-bound computation, and the results are produced to an output topic via FutureProducer. Multiple worker instances can run concurrently using FuturesUnordered for horizontal scaling within a single process.

Key outcomes:

  • Async stream-based consumption via consumer.stream().try_for_each()
  • Separation of IO-bound (Kafka produce/consume) and CPU-bound (computation) work
  • Horizontal scaling via multiple concurrent worker tasks

Usage

Execute this workflow when you have a Kafka processing pipeline that involves CPU-intensive transformations (e.g., parsing, aggregation, ML inference) alongside IO-bound Kafka operations. The pattern prevents CPU work from blocking the async runtime while maintaining high throughput via concurrent processing.

Execution Steps

Step 1: Configure Consumer and Producer

Build a StreamConsumer with enable.auto.commit set to false (since messages are processed out of order by the thread pool, auto-commit based on position would be incorrect). Build a FutureProducer with appropriate timeout settings. Both clients connect to the same broker cluster.

Key considerations:

  • Auto-commit is disabled because out-of-order processing means position-based commits would skip unprocessed messages
  • The producer is cloned into each spawned task (FutureProducer is cheaply cloneable via Arc)

Step 2: Subscribe and Create Message Stream

Subscribe the consumer to the input topic and obtain an async stream via consumer.stream(). The stream yields Result<BorrowedMessage> items that can be processed with standard futures::StreamExt combinators like try_for_each.

Key considerations:

  • consumer.stream() returns a MessageStream that bridges librdkafka's polling to Rust's async waker system
  • The stream is lazy: messages are only fetched when the stream is polled

Step 3: Detach Messages for Cross-Task Transfer

For each BorrowedMessage received from the stream, call .detach() to convert it to an OwnedMessage. Borrowed messages are tied to the consumer's internal buffer lifetime and cannot be sent to other tasks. Detaching performs a heap allocation to copy the message data.

Key considerations:

  • BorrowedMessage has a lifetime tied to the consumer and cannot be sent across task boundaries
  • .detach() creates an OwnedMessage that owns its data and is Send + 'static
  • Perform any sequential pre-processing (logging, filtering) before detaching

Step 4: Dispatch CPU Work to Blocking Thread Pool

Use tokio::spawn to create a new async task, and within it call tokio::task::spawn_blocking to run the CPU-intensive computation on Tokio's dedicated blocking thread pool. This prevents the computation from starving the async runtime's cooperative scheduling. Await the blocking task's result within the spawned async task.

Key considerations:

  • tokio::spawn handles the async orchestration (awaiting the blocking result, then producing)
  • tokio::task::spawn_blocking offloads synchronous CPU work to a separate thread pool
  • The outer tokio::spawn means processing of different messages proceeds concurrently

Step 5: Produce Results to Output Topic

After the CPU computation completes, construct a FutureRecord with the computation result as the payload and send it via the cloned FutureProducer. Await the delivery future to confirm the message was accepted by the broker.

Key considerations:

  • The producer clone is moved into the spawned task
  • Each produced message gets its own delivery confirmation future
  • Errors in production are logged but do not stop the processing pipeline in this pattern

Step 6: Scale with Multiple Workers (Optional)

Launch multiple instances of the processing pipeline using tokio::spawn collected into a FuturesUnordered. Each worker creates its own consumer-producer pair and joins the same consumer group, so Kafka distributes partitions across workers automatically.

Key considerations:

  • All workers share the same group.id for partition assignment
  • The number of workers should not exceed the number of partitions in the input topic
  • FuturesUnordered allows all workers to make progress concurrently

Execution Diagram

GitHub URL

Workflow Repository