Asynchronous Rust
An asynchronous interface is one that may not yield a result right away. Its result is deferred to a later time. This allows the caller to continue doing other work in the meantime rather than having to go to sleep.
An asynchronous interface is a method that returns a Poll:
#![allow(unused)] fn main() { // std::task::Poll enum Poll<T> { Ready(T), Pending } }
This enum is often used with functions with names that begin with the word poll
, and it signifies that the operation is non-blocking.
- If you have a handle to two or more different
Poll
s, you can loop over them and check for the first one that isPoll::Ready
. - Be careful about doing this in a hot loop though, checking exactly 2 poll methods billions of times a second when its unlikely you'd get a
Ready
for minutes is a waste of CPU resources.
Polling in Rust is standardized with a trait called Future
. A simplified version is below:
#![allow(unused)] fn main() { // Not the real std::future::Future trait Future { type Output; fn poll(&mut self) -> Poll<Self::Output>; } }
Things that a future could be:
- The next time a network packet comes in
- The next time a mouse cursor moves
- The next time some amount of time has elapsed
Futures nomenclature
- When a future eventually returns
Poll::Ready<T>
, we say that the future "resolved to" a T. - Instead of
poll_recv
andpoll_keypress
, we can just have a methodrecv
orkeypress
that each returnimpl Future
with an appropriateOutput
type. - In general, you cannot assume that you can call
poll
on a future AFTER it has already returned aPoll::ready
. It is generally acceptable for it to panic if you do this.- But some futures explicitly allow you to poll it again and again even after it is ready. These futures are referred to as a "fused future".
Async/Await
async
and its related keyword await
are syntactic sugar for a function that returns an impl Future
. Under the hood, they are implemented with generators
.
Example:
#![allow(unused)] fn main() { async fn forward<T>(receiver: Receiver<T>, sender: Sender<T>) { while let Some(t) = receiver.next().await { sender.send(t).await; } } }
Generators
Generators are functions/chunks of code with the ability to allow it to yield
execution midway, and resume execution from where it last yielded later on.
In the above example, Rust transforms async
functions (like forward
) into a generator.
The compiler transforms the function into a generator by creating some extra code that helps it do what it needs to do:
- A custom data structure associated to the function which is capable of storing all the state in the generator at a given point in time (when it yields).
- A
resume
method which allows the generator to resume from the snapshot state, executing from the lastyield
.
Generators (and therefore futures) have a potential of getting huge.
- The data structure responsible for holding state will have to be large enough to hold everything on the function stack. If you've created a
[u8; 8192]
, then the data structure will need to hold that structure. - In addition, any and all futures that your
async fn
awaits on will need to be stored in this data structure so that it can be polled later. - If you perform
perf
profiles, you'll see an excess of time spent inmemcpy
functions since these data structures will be copied and moved repeatedly. - When you discover this to be an issue, you have two options:
- Reduce the amount of local state that the
async
function needs. - Move the future to the heap with
Box::pin
- Reduce the amount of local state that the
Pin
Copied for ease of comparison:
#![allow(unused)] fn main() { async fn forward<T>(receiver: Receiver<T>, sender: Sender<T>) { while let Some(t) = receiver.next().await { sender.send(t).await; } } }
What happens if the generator takes a reference to a local variable?
- The future returned by
receiver.next()
must hold a&receiver
if anext
message is not immediately available so it knows where to try again when the generator resumes. - When
forward
yields, the future and the reference the future contains get stashed away inside the generator's associated data structure.
#![allow(unused)] fn main() { async fn try_forward<T>(receiver: Receiver<T>, sender: Sender<T>) -> Option<impl Future> { let mut f = forward(receiver, sender); // 1 if f.poll().is_pending() { // 2 Some(f) } else { None } } }
- On the line commented
1
, we callforward(..)
, which is an async function that will return aFuture<Output = ()>
. - On the line commented
2
, we callf.poll()
on the future returned in line1
. This will return either aPoll::Ready<()>
or aPoll::Pending
.- When we call
f.poll()
, theforward
generator resumes executing. - The
forward
generator continually callsreceiver.next().await
andsender.send(t).await
. - As long as both
receiver.next()
andsender.send(t)
both return aPoll::Ready
, theforward
generator will not yield back to ourf.poll()
call. - At some point, one of
receiver.next()
orsender.send(t)
will returnPoll::Pending
ORPoll::Ready<None>
. - If the value is
Poll::Pending
, it will yield back out to thef.poll()
call. - If the value is
Poll::Ready<None>
then the function will kick out of thewhile
loop and return aPoll::Ready<()>
.
- When we call
Here's where the issue is:
f.poll()
will return one ofPoll::Pending
orPoll::Ready<()>
.- If it's
Poll::Pending
, thetry_forward
function will move the future into aSome
and return that. Otherwise, it will return aNone
. - When the future is moved into the
Some
, references stored in the generator data structure toreceiver
andsender
will no longer be valid!
Futures are inherently self-referential. If the future is moved, then its data is also moved, which invalidates the references to self
. See additional reading here.
- The solution is an advanced Rust type called
Pin
, which adds a contract that the value pinned will never move again, allowing the references to remain valid. - While
Future
s make use ofPin
,Pin
is general purpose and can be used for any self-referential data structure.
#![allow(unused)] fn main() { struct Pin<P> { pointer: P } impl<P> Pin<P> where P: Deref { pub unsafe fn new_unchecked(pointer: P) -> Self { .. } } impl<'a, T> Pin<&'a mut T> { pub unsafe fn get_unchecked_mut(self) -> &'a mut T { .. } } impl<P> Deref for Pin<P> where P: Deref { type Target = P::Target; fn deref(&self) -> &Self::Target; } }
Pin
holds a pointer type, which is indicative ofP: Deref
, aka the typeP
can be dereferenced to some targetT
.- This means that instead of
Pin<MyType>
you can have aPin<Box<MyType>>
, aPin<Rc<MyType>>
, or aPin<&mut MyType>
.
- This means that instead of
Pin
's constructor,new_unchecked
, isunsafe
.- It is unsafe because the compiler has no way to check that the pointer indeed promises that the target value won't move again.
- Its safety depends on the implementation of traits that are themselves safe.
- Specifically,
Deref
,DerefMut
andDrop
for theP
pointer given toPin
cannot move their specific pointed-to values.
Pin
has aget_unchecked_mut
method that gives you an exclusive reference to its target type,&'a mut T
.- This method is also
unsafe
, because users must promise to not use the&mut T
to move theT
or otherwise invalidate its memory. - This invariant could be broken if a user wrote something like
std::mem::swap
on two difference&'a mut T
s.
- This method is also
Pin::get_unchecked_mut
relies on its implementation ofDerefMut<Target = T>
.Pin
has an implementation ofDeref<Target = T>
that is always safe (as opposed toDerefMut<Target = T>
).Deref
is safe because you only get a&T
and it does not let you move it without writing other unsafe code.
Unpin
Unpin
is an auto marker trait that asserts that the type is safe to move out of a Pin
when used as the Pin
's target type T
.
- This means that the type does not rely on any of
Pin
's guarantees. - This means that the type may be moved out of the
Pin
without causing memory unsafety. - Only types that explicitly opt out of
Unpin
are!Unpin
. These types include generators and types that contain other!Unpin
.
When the target type is Unpin
, it allows us to provide a much simpler and safer interface to Pin
.
#![allow(unused)] fn main() { impl<P> Pin<P> where P: Deref, P::Target: Unpin { pub fn new(pointer: P) -> Self; } impl<P> DerefMut for Pin<P> where P: DerefMut, P::Target: Unpin { type Target = P::Target; fn deref_mut(&mut self) -> &mut Self::Target; } }
When types are Unpin
, it means that the type does not care if it is moved, even if it was previously pinned.
- For
Unpin
types,Pin
is basically irrelevant.
How to Pin
Suppose we're considering some type that implements Future
.
If the type is Unpin
, then just call Pin::new(&mut future)
.
If the type is !Unpin
, then we can pin the future to the heap or to the stack.
The primary contract of Pin
is that once something has been pinned, it cannot move. And the API for Pin
takes care of honoring that contract for all methods and traits on Pin
.
- The role of any function/user that constructs a
Pin
is to ensure that if thePin
itself moves, that the underlying target does not move too. - We can do this by placing the value on the heap, then place a pointer to that value in the
Pin
. This allowsPin
to move, but the underlying data won't move with it.- This is done with
Box::pin
.
- This is done with
Awaiting Futures
#![allow(unused)] fn main() { pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
await
syntax is syntactic sugar.
#![allow(unused)] fn main() { // In both examples, `expr` is a standin for `some_future.await` // A heavily simplified version of the desugared await loop { if let Poll::Ready(r) = expr.poll() { break r } else { yield } } // A less simplified (but still simplified) desugared await match expr { mut pinned => loop { match unsafe { Pin::new_unchecked(&mut pinned) }.poll() { Poll::Ready(r) => break r, Poll::Pending => yield, } } } }
Wakers
Both of the above aren't accurate because they each require a loop
to call .poll()
again. Doing it this way would burn a lot of CPU cycles you could have probably used for other, more useful things.
We would rather have the executor poll the future once, then put it to sleep until another future can make progress. It should then wake up long enough to call .poll()
on those futures, then go back to sleep.
Waking up should therefore, be conditioned on some event, e.g.:
- When a network packet arrives on a given port.
- When the mouse cursor moves.
- When someone sends on this channel.
- When the CPU receives a particular interrupt.
- After a specific set of time has passed.
- Custom, user-defined, futures can also be created with not just one but multiple conditions.
A Waker
provides a way to wake the executor. By waking the executor, you are signaling to it that progress can be made.
Executors construct a Waker
, then uses it when polling futures. The Waker
is part of the mechanisms that the executor uses to go to sleep and later awaken. In the poll()
method the second argument for Context
contains the Waker
.
Waker
has the following interface:
#![allow(unused)] fn main() { impl Waker { pub fn wake(self); pub fn wake_by_ref(&self); pub unsafe fn from_raw(waker: RawWaker) -> Waker; } }
wake
and/or wake_by_ref
should be called when the future can again make progress. Just in case it wasn't clear: the Future should be calling wake
/wake_by_ref
as part of its poll()
method!.
The logic executed when wake
or wake_by_ref
is called is entirely up to the executor that constructed the Waker
. The executor decides the implementation for what happens when these functions are called, or when the Waker
needs to be clone
d, or when the Waker
needs to be drop
ped.
- This specification (by the executor) is performed using a manually implemented vtable which functions similarly to dynamic dispatch.
If Future::poll
returns a Poll::Pending
, it is the future's responsibility to ensure that something calls wake
on the provided Waker
when the future is next able to make progress.
- When implementing your own
Future
, you are usually calling another method that also returns aFuture
, and so your future should returnPoll::Pending
only if the inner future does. You trust that the inner future handles waking theWaker
. - In other cases, you might have to handle a future that does not poll other futures. These are called _leaf future_s.
- Leaf futures are usually either (1) ones that wait for events that originate within the same process, or (2) ones that wait for events external to the process (like a TCP packet read).
- For (1) above, the pattern is to store the
Waker
in a place that the code that needs to call it can find it. - For (2) above, it's more complicated since the code you're calling has no idea of futures and they won't be waking the
Waker
for you. Executors typically provide implementations of leaf futures that communicate behind the scenes with the operating system.- When a leaf future realizes it must wait for an external event, it updates that executor's state to include an external event source alongside the
Waker
. - When the executor can't make any progress, it gathers all event sources for pending leaf futures, then makes a blocking call to the operating system which should return if any of the leaf futures can respond to an incoming event.
- The blocking call to the operating system on Linux is typically
epoll
. - A
reactor
is the part of an executor that has leaf futures register event sources with and that the executor waits on when it has no more useful work to do. - Due to the tight integration between leaf futures and how the executor then interfaces with the OS, most leaf futures are incompatible with being executed on another executor.
- When a leaf future realizes it must wait for an external event, it updates that executor's state to include an external event source alongside the
Futures in a tree
Futures in an async program form a tree: futures contain zero or more other futures, recursively. The root future is given to whatever the executor's main "run" function is.
A root future is called a task
. The root future/task is the only point of contact between the executor and the futures tree.
- Executors call
poll()
on the root task. From that point on, the code of each contained future must figure out which inner future(s) to poll in response, all the way to the leaf. - In general, executors construct a separate
Waker
for each task they poll so that they know which task was just made runnable.- The underlying
RawWaker
makes DRYs out the code that would otherwise be duplicated across allWaker
s.
- The underlying
When the executor executes poll()
on a future the second or subsequent time, it still begins running from the top of its implementation of Future::poll
and must decide how to get to the future deeper down that can now make progress.
- If there is only one obvious future to return execution to, then it does that.
- But there are cases where it gets a little complex, for example a
join()
might be waiting for a collection of futures to all finish, or aselect()
might be waiting for the first of a collection of futures to finish.- When a future has to do a
join()
orselect()
, it becomes effectively a "subexecutor". - Subexecutors wrap the
Waker
with their ownWaker
type before theypoll
an inner future, which then allows it to store state on which futures are runnable, and they also get notified whenWaker::wake
is called. - Subexecutors can use its own internal state to figure out which inner futures needs to be polled.
- When a future has to do a
It's worth mentioning: if there's only one root future and that root future holds hundreds of inner futures, the program is effectively single-threaded, due there being only one root future.
- To actually have multi-threaded async, you can
spawn
the futures. (aka pass aFuture
to thespawn
method.) - Spawned futures still depend on being polled by the executor.
- If the executor stops running/is dropped then the tasks also stop making progress (since nothing is around to call
poll()
).- There are some executors that are multi-threaded and which can continue to poll tasks even if they yield control. You need to check the docs for your executor of choice.