Task
new task
fn new_task<T, S>(
task: T,
scheduler: S,
id: Id,
) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
{
//: RawTask has Copy semantics
//: RawTask includes a pointer to `Header`, i.e. pointer to `Cell`
//: The pointer points to a heap-allocated value(`Box::new`ed).
let raw = RawTask::new::<T, S>(task, scheduler, id);
//: `Task` has `impl Drop`
let task = task::Task {
raw, //: raw copied
_p: PhantomData,
};
//: `Task` will be dropped as `Notified` gets dropped
let notified = Notified(Task {
raw, //: raw copied
_p: PhantomData,
});
let join = JoinHandle::new(raw); //: raw copied
(task, notified, join)
}
//: `Task` has `PhantomData<S>` as `RawTask::new` need generic type `S`
//: `RawTask::new` need `S` as `Cell` need `S`
//: `Cell` need `S` as `Core` need `S`
struct Task<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
//: new type pattern
struct Notified<S: 'static>(Task<S>);
//: `Header` is the first field of `Cell`
//: pointer to `Header` implies pointer to `Cell`
struct RawTask {
ptr: NonNull<Header>,
}
struct UnownedTask<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
struct Header {
/// Task state.
pub(super) state: State,
/// Pointer to next task, used with the injection queue.
pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
/// Table of function pointers for executing actions on the task.
pub(super) vtable: &'static Vtable,
}
struct State {
val: AtomicUsize,
}
impl RawTask {
fn new<T, S>(task: T, scheduler: S, id: Id) -> RawTask
{
let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
//: ptr.cast() casts `Cell` to `Header`
let ptr = unsafe { NonNull::new_unchecked(ptr.cast()) };
RawTask { ptr }
}
}
#[repr(C)]
struct Cell<T: Future, S> {
/// Hot task state data
//: It is critical for `Header` to be the first field as the task structure will
//: be referenced by both *mut Cell and *mut Header.
header: Header,
/// Either the future or output, depending on the execution stage.
core: Core<T, S>,
/// Cold data
trailer: Trailer,
}
impl<T: Future, S: Schedule> Cell<T, S> {
fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
fn new_header(
state: State,
vtable: &'static Vtable,
) -> Header {
Header {
state,
queue_next: UnsafeCell::new(None),
vtable,
owner_id: UnsafeCell::new(None),
}
}
let vtable = raw::vtable::<T, S>();
let result = Box::new(Cell {
trailer: Trailer::new(scheduler.hooks()),
header: new_header(
state,
vtable,
),
core: Core {
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
},
});
result
}
}
fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll::<T, S>,
schedule: schedule::<S>,
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
drop_abort_handle: drop_abort_handle::<T, S>,
shutdown: shutdown::<T, S>,
trailer_offset: OffsetHelper::<T, S>::TRAILER_OFFSET,
scheduler_offset: OffsetHelper::<T, S>::SCHEDULER_OFFSET,
id_offset: OffsetHelper::<T, S>::ID_OFFSET,
}
}
#[repr(C)]
struct Core<T: Future, S> {
/// Scheduler used to drive this future.
scheduler: S,
/// The task's ID, used for populating `JoinError`s.
task_id: Id,
/// Either the future or the output.
stage: CoreStage<T>,
}
struct CoreStage<T: Future> {
stage: UnsafeCell<Stage<T>>,
}
#[repr(C)]
enum Stage<T: Future> {
Running(T),
Finished(super::Result<T::Output>),
Consumed,
}task deallocation
A task is initialized with three references. Only when ref count decrements
to zero, the underlying Cell gets deallocated. Tokio uses std::mem::forget
to avoid double free.
struct Task<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
//: new type pattern
struct Notified<S: 'static>(Task<S>);
//: `Header` is the first field of `Cell`
//: pointer to `Header` implies pointer to `Cell`
struct RawTask {
ptr: NonNull<Header>,
}
struct UnownedTask<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
impl<S: 'static> Drop for Task<S> {
fn drop(&mut self) {
// Decrement the ref count
if self.header().state.ref_dec() {
// Deallocate if this is the final ref count
self.raw.dealloc();
}
}
}
impl<S: 'static> Drop for UnownedTask<S> {
fn drop(&mut self) {
// Decrement the ref count
if self.raw.header().state.ref_dec_twice() {
// Deallocate if this is the final ref count
self.raw.dealloc();
}
}
}
impl RawTask {
fn dealloc(self) {
let vtable = self.header().vtable;
//: deallocate `Cell`
unsafe {
(vtable.dealloc)(self.ptr);
}
}
}
/// Typed raw task handle.
struct Harness<T: Future, S: 'static> {
cell: NonNull<Cell<T, S>>,
}
unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.dealloc();
}
impl Harness {
fn dealloc(self) {
unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
}
}
}