Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Fede1024 Rust rdkafka Tokio Spawn Blocking

From Leeroopedia


Knowledge Sources
Domains Async_Programming, Concurrency
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete blocking task dispatch tool using tokio::task::spawn_blocking, used in rust-rdkafka's async processing examples.

Description

tokio::task::spawn_blocking runs a closure on Tokio's dedicated blocking thread pool and returns a JoinHandle that resolves when the closure completes. This is an external API from the tokio crate, not defined in rust-rdkafka, but used extensively in rdkafka's async processing examples for CPU-intensive message transformation.

In the context of rust-rdkafka, spawn_blocking receives an OwnedMessage (detached from the consumer) and performs the expensive processing. The FutureProducer reference is passed in or cloned for producing results.

Usage

Use spawn_blocking in your consume-transform-produce pipeline when the transform step is CPU-intensive. Requires that message data is owned (OwnedMessage via detach()) since the closure must be Send + 'static.

Code Reference

Source Location

  • Repository: External (tokio crate, not in this repository)

Signature

pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,

Import

use tokio::task::spawn_blocking;

I/O Contract

Inputs

Name Type Required Description
f F: FnOnce() -> R + Send + 'static Yes Closure performing CPU-intensive work

Outputs

Name Type Description
returns JoinHandle<R> Future resolving to the closure's return value

Usage Examples

CPU-Intensive Message Processing

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;

let borrowed_msg = consumer.recv().await.expect("recv failed");
let owned_msg = borrowed_msg.detach();

let processed = tokio::task::spawn_blocking(move || {
    // Expensive computation on blocking thread pool
    let payload = owned_msg.payload().unwrap_or(&[]);
    transform_payload(payload)
})
.await
.expect("Blocking task panicked");

// Back on async runtime, send result
producer
    .send(
        FutureRecord::to("output-topic").payload(&processed),
        Duration::from_secs(0),
    )
    .await;

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment