spawn task
Thread Spawner definition
pub(crate) struct Spawner {
inner: Arc<Inner>,
}
struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
/// Pool threads wait on this.
condvar: Condvar,
// Maximum number of threads.
thread_cap: usize,
// Metrics about the pool.
metrics: SpawnerMetrics,
}
struct Shared {
queue: VecDeque<blocking::pool::Task>,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
/// This holds the `JoinHandles` for all running threads; on shutdown, the thread
/// calling shutdown handles joining on these.
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
/// This is a counter used to iterate `worker_threads` in a consistent order (for loom's
/// benefit).
worker_thread_index: usize,
}spawn_task
If there are idle threads,
fn spawn_task(&self, task: blocking::pool::Task, rt: &Handle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();
shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();
self.inner.metrics.dec_num_idle_threads();
shared.num_notify += 1;
self.inner.condvar.notify_one();
Ok(())
}If there are no idle threads and we can spawn one,
fn spawn_task(&self, task: blocking::pool::Task, rt: &Handle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();
shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();
let shutdown_tx = shared.shutdown_tx.clone();
if let Some(shutdown_tx) = shutdown_tx {
let id = shared.worker_thread_index;
match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(_) => return Err(SpawnError::NoThreads(e));
}
}
Ok(())
}
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &runtime::handle::Handle,
id: usize,
) -> io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new();
let rt = rt.clone();
//: spawn and run
builder.spawn(move || {
// Only the reference should be moved into the closure
let _enter = rt.enter();
rt.inner.blocking_spawner().inner.run(id);
drop(shutdown_tx);
})
}spawner runloop
impl Inner {
fn run(&self, worker_thread_id: usize) {
let mut shared = self.shared.lock();
let mut join_on_thread = None;
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let timedout = Duration::from_secs(1);
let lock_result = self.condvar.wait_timeout(shared, timedout).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
shared.num_notify -= 1;
break;
}
//: cleanup logic removed when condvar "timed out"
// Spurious wakeup detected, go back to sleep.
}
//: shutdown logic removed
}
// Thread exit
self.metrics.dec_num_threads();
let _ = self.metrics.dec_num_idle_threads();
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}
drop(shared);
}
}run task
//: blocking::pool::Task
struct Task {
task: task::UnownedTask<BlockingSchedule>,
mandatory: Mandatory,
}
impl Task {
fn run(self) {
self.task.run();
}
}
impl UnownedTask {
fn run(self) {
let raw = self.raw;
mem::forget(self);
// Transfer one ref-count to a Task object.
let task = task::Task {
raw,
_p: PhantomData,
};
// Use the other ref-count to poll the task.
raw.poll();
// Decrement our extra ref-count
drop(task);
}
}
impl RawTask {
fn poll(self) {
let vtable = self.header().vtable;
unsafe { (vtable.poll)(self.ptr) }
}
}
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll();
}
impl Harness {
fn poll(self) {
match self.poll_inner() {
PollFuture::Notified => {
self.core()
.scheduler
.yield_now(Notified(self.get_new_task()));
self.drop_reference();
}
PollFuture::Complete => {
self.complete();
}
PollFuture::Dealloc => {
self.dealloc();
}
PollFuture::Done => (),
}
}
//: poll around state
fn poll_inner(&self) -> PollFuture {
use super::state::{TransitionToIdle, TransitionToRunning};
match self.state().transition_to_running() {
TransitionToRunning::Success => {
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<S>(&header_ptr);
let cx = Context::from_waker(&waker_ref);
let res = poll_future(self.core(), cx);
if res == Poll::Ready(()) {
// The future completed. Move on to complete the task.
return PollFuture::Complete;
}
match self.state().transition_to_idle() {
TransitionToIdle::Ok => PollFuture::Done,
TransitionToIdle::OkNotified => PollFuture::Notified,
TransitionToIdle::OkDealloc => PollFuture::Dealloc,
TransitionToIdle::Cancelled => PollFuture::Complete,
}
}
TransitionToRunning::Cancelled => {
cancel_task(self.core());
PollFuture::Complete
}
TransitionToRunning::Failed => PollFuture::Done,
TransitionToRunning::Dealloc => PollFuture::Dealloc,
}
}
}
//: poll around panics!
fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
// Poll the future.
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
struct Guard<'a, T: Future, S: Schedule> {
core: &'a Core<T, S>,
}
impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
fn drop(&mut self) {
// If the future panics on poll, we drop it inside the panic
// guard.
self.core.drop_future_or_output();
}
}
let guard = Guard { core };
//: real poll
let res = guard.core.poll(cx);
mem::forget(guard);
res
}));
// Prepare output for being placed in the core stage.
let output = match output {
Ok(Poll::Pending) => return Poll::Pending,
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
};
// Catch and ignore panics if the future panics on drop.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.store_output(output);
}));
if res.is_err() {
core.scheduler.unhandled_panic();
}
Poll::Ready(())
}