spawn_blocking
- This function is intended for non-async operations that eventually finish on their own.
- Tokio will spawn more blocking threads when they are requested through this
function until the upper limit configured on the
Builderis reached.
Here’s code path in top down stack:
task::blocking::spawn_blocking
runtime::blocking::pool::spawn_blocking
runtime::handle::Handle::spawn_blocking
runtime::blocking::pool::Spawner::spawn_blocking
runtime::blocking::pool::Spawner::spawn_blocking_inner
runtime::blocking::pool::Spawner::spawn_taskpool::spawn_blocking
fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
{
let rt = Handle::current();
rt.spawn_blocking(func)
}Use current runtime handle for spawn_blocking task.
Handle::spawn_blocking
impl Handle {
fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
{
self.inner.blocking_spawner().spawn_blocking(self, func)
}
}Different schedulers could have different blocking Spawner, but tokio
use one Spawner.
Spawner::spawn_blocking
impl Spawner {
fn spawn_blocking<F, R>(&self, rt: &runtime::handle::Handle, func: F) -> JoinHandle<R>
{
let (join_handle, spawn_result) =
self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt)
match spawn_result {
Ok(()) => join_handle,
Err(SpawnError::ShuttingDown) => join_handle,
Err(SpawnError::NoThreads(e)) => {
panic!("OS can't spawn worker thread: {}", e)
}
}
}
}
fn spawn_blocking_inner<F, R>(
&self,
func: F,
is_mandatory: Mandatory,
name: Option<&str>,
rt: &Handle,
) -> (JoinHandle<R>, Result<(), SpawnError>)
{
//: `BlockingTask` implements `Future`
let fut = BlockingTask::new(func);
let id = task::Id::next();
//: `task`: task::UnownedTask<BlockingSchedule>
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
let spawned = self.spawn_task(blocking::pool::Task::new(task, is_mandatory), rt);
(handle, spawned)
}BlockingTask implements Future. This future will get polled when [[spawn_task#|#run task]].
impl<T, R> Future for BlockingTask<T>
{
type Output = R;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
let me = &mut *self;
let func = me
.func
.take()
.expect("[internal exception] blocking task ran twice.");
//: comments removed from the original source code
crate::runtime::coop::stop();
Poll::Ready(func())
}
}define an UnownedTask
/// Converts a function to a future that completes on poll.
struct BlockingTask<T> {
func: Option<T>,
}
fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>)
{
let (task, notified, join) = new_task(task, scheduler, id);
let unowned = UnownedTask {
raw: task.raw,
_p: PhantomData,
};
//: takes ownership and do not invoke `task::Task` destructor
//: Task has `impl Drop for task::Task<S>`
std::mem::forget(task);
//: takes ownership and do not invoke `task::Task` destructor
std::mem::forget(notified);
(unowned, join)
}