Building a runtime
The runtime ‒ all the pieces needed to run an event driven application ‒ is
already available. You don’t need to know this if you want to just use tokio.
However, it may be useful to know what happens under the hood, both to gain some
more understanding of the details in case something goes wrong, and to be able
to customize it beyond what the runtime Builder
supports.
We are going to build a single threaded runtime, because it is slightly simpler to put together. Not that the default multi threaded one would be conceptually more complex, but there are more moving parts around. Knowing the details here can be a stepping stone to reading the code of the default runtime.
A complete, working example of things discussed here can be found in the git repository.
The Park
trait
The asynchronous world is inherently about waiting for something to happen
(and being able to wait for multiple things at once). It is no surprise there’s
a trait to abstract over the waiting. It’s called Park
.
The idea is, if there’s nothing better to do, the control is passed to the
Park
until something interesting happens and the control is taken away from it
again or until some specified time passes. It is up to the Park
how it spends
this time. It can either do something useful (processing background jobs) or
simply block the thread in some way.
Some things are the bottom Park
implementations ‒ they somehow block the
thread. Other things implementing the trait only delegate the park calls to some
underlying object they wrap (with some added functionality), allowing to stack
things onto each other.
The usual components
We definitely need a Reactor
to accept external events (like network sockets
being readable) from the OS. It does so by blocking on epoll
, kqueue
or
other OS-dependent primitive, through the mio crate. This can’t delegate the
waiting to anything else, so the reactor goes to the bottom of our stack.
The reactor is able to notify our futures of data coming over the network and
similar events, but we need an executor to actually run them. We’ll be using the
CurrentThread
executor, because we’re building a single-threaded runtime.
Use any other executor that suits your needs. The executor needs a Park
underneath to wait when there are no futures ready to run. It doesn’t implement
Park
, therefore it must go on the top of the whole stack.
While not strictly necessary, it is useful to be able to run delayed futures ‒
timeouts and similar. Therefore, we place the Timer
in the middle ‒
fortunately, it can be placed on top of one Park
and also implements Park
.
This plays a similar role for timeouts as reactor does for IO-based futures.
In addition, any custom layer can be added. One example could be some kind of idle bookkeeping component ‒ it would try to repeatedly do a bit of work if asked to wait and interleave it with letting the park below it also pick up events. If there was no bookkeeping to be done, it would simply delegate the waiting.
This is how the creation of the reactor, timer and executor would look like in code:
# extern crate futures;
# extern crate tokio;
# extern crate tokio_executor;
# extern crate tokio_reactor;
# extern crate tokio_timer;
#
# use std::io::Error as IoError;
# use std::time::{Duration, Instant};
#
# use futures::{future, Future};
# use tokio::executor::current_thread::{self, CurrentThread};
# use tokio_reactor::Reactor;
# use tokio_timer::timer::{self, Timer};
# fn run<F: Future<Item = (), Error = std::io::Error>>(f: F) -> Result<(), std::io::Error> {
let reactor = Reactor::new()?;
// The reactor itself will get consumed by timer,
// so we keep a handle to communicate with it.
let reactor_handle = reactor.handle();
let timer = Timer::new(reactor);
let timer_handle = timer.handle();
let mut executor = CurrentThread::new_with_park(timer);
# Ok(())
# }
# fn main() {
# run(futures::future::lazy(|| Ok(()))).unwrap();
# }
This way, if there are futures to execute, they’ll get executed first. Then once it runs out of ready futures, it’ll look for timeouts to fire. This may generate some more ready futures (which would get executed next). If no timeouts fire, the timer computes for how long the reactor can safely block and lets it wait for external events.
Global state
We’ve built the components that do the actual work. But we need a way to build and submit the work to them. We could do so through the handles, but to do that, we would have to carry them around which would be far from ergonomic.
To avoid the tedious passing of several handles around, the built-in runtime
stores them in a thread local storage. Several modules in tokio have a
with_default
method, which takes the corresponding handle and a closure. It
stores the handle in the thread local storage and runs the closure. It then
restores the original value of the TLS after the closure finishes.
This way we would run a future with all the default values set, so it can freely use them:
# extern crate futures;
# extern crate tokio;
# extern crate tokio_executor;
# extern crate tokio_reactor;
# extern crate tokio_timer;
#
# use std::io::Error as IoError;
# use std::time::{Duration, Instant};
#
# use futures::{future, Future};
# use tokio::executor::current_thread::{self, CurrentThread};
# use tokio_reactor::Reactor;
# use tokio_timer::timer::{self, Timer};
# fn run<F: Future<Item = (), Error = std::io::Error>>(f: F) -> Result<(), std::io::Error> {
# let reactor = Reactor::new()?;
# let reactor_handle = reactor.handle();
# let timer = Timer::new(reactor);
# let timer_handle = timer.handle();
# let mut executor = CurrentThread::new_with_park(timer);
// Binds an executor to this thread
let mut enter = tokio_executor::enter()
.expect("Multiple executors at once");
// Set the defaults before running the closure
let result = tokio_reactor::with_default(
&reactor_handle,
&mut enter,
|enter| timer::with_default(
&timer_handle,
enter,
|enter| {
let mut default_executor =
current_thread::TaskExecutor::current();
tokio_executor::with_default(
&mut default_executor,
enter,
|enter| executor.enter(enter).block_on(f)
)
}
)
);
# Ok(())
# }
# fn main() {
# run(futures::future::lazy(|| Ok(()))).unwrap();
# }
There are a few things of note. First, the enter
thing just ensures that we
don’t run multiple executors on the same thread at the same time. Running
multiple executors would get one of them blocked, which would act in a very not
useful way, therefore this is footgun prevention.
Second, we want to use the same executor as the default executor and default
current thread executor, and also to run the executor (not only spawn a future
onto it without further waiting). To do both, we need two mutable references to
it, which is not possible. To work around that, we set the current thread
executor (it actually sets itself, in the executor.block_on
call, or any
similar one). We use the TaskExecutor
as the default one, which is a proxy to
whatever current thread executor is configured at the time of its use.
Finally, the block_on
will execute the single future to completion (and will
process any other futures spawned in the executor as well, but it’ll not wait
for them to finish if f
finishes first). The result of the future is bubbled
upwards through all the with_default
calls and can be returned or used in any
other way. If you want to wait for all the other futures to finish too, there’s
also executor.run
which can be executed afterwards.