Implementation:Fede1024 Rust rdkafka Tokio Spawn Blocking
| Knowledge Sources | |
|---|---|
| Domains | Async_Programming, Concurrency |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete blocking task dispatch tool using tokio::task::spawn_blocking, used in rust-rdkafka's async processing examples.
Description
tokio::task::spawn_blocking runs a closure on Tokio's dedicated blocking thread pool and returns a JoinHandle that resolves when the closure completes. This is an external API from the tokio crate, not defined in rust-rdkafka, but used extensively in rdkafka's async processing examples for CPU-intensive message transformation.
In the context of rust-rdkafka, spawn_blocking receives an OwnedMessage (detached from the consumer) and performs the expensive processing. The FutureProducer reference is passed in or cloned for producing results.
Usage
Use spawn_blocking in your consume-transform-produce pipeline when the transform step is CPU-intensive. Requires that message data is owned (OwnedMessage via detach()) since the closure must be Send + 'static.
Code Reference
Source Location
- Repository: External (tokio crate, not in this repository)
Signature
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Import
use tokio::task::spawn_blocking;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| f | F: FnOnce() -> R + Send + 'static | Yes | Closure performing CPU-intensive work |
Outputs
| Name | Type | Description |
|---|---|---|
| returns | JoinHandle<R> | Future resolving to the closure's return value |
Usage Examples
CPU-Intensive Message Processing
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
let borrowed_msg = consumer.recv().await.expect("recv failed");
let owned_msg = borrowed_msg.detach();
let processed = tokio::task::spawn_blocking(move || {
// Expensive computation on blocking thread pool
let payload = owned_msg.payload().unwrap_or(&[]);
transform_payload(payload)
})
.await
.expect("Blocking task panicked");
// Back on async runtime, send result
producer
.send(
FutureRecord::to("output-topic").payload(&processed),
Duration::from_secs(0),
)
.await;