Getting asynchronous
Futures are all about managing asynchronicity. Implementing a future that
completes asynchronously requires correctly handling receiving Async::NotReady
from the inner future.
Let’s start by implementing a future that establishes a TCP socket with a remote peer and extracts the peer socket address, writing it to STDOUT.
# #![deny(deprecated)]
extern crate tokio;
#[macro_use]
extern crate futures;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use futures::{Future, Async, Poll};
struct GetPeerAddr {
connect: ConnectFuture,
}
impl Future for GetPeerAddr {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.connect.poll() {
Ok(Async::Ready(socket)) => {
println!("peer address = {}", socket.peer_addr().unwrap());
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
println!("failed to connect: {}", e);
Ok(Async::Ready(()))
}
}
}
}
fn main() {
let addr = "192.168.0.1:1234".parse().unwrap();
let connect_future = TcpStream::connect(&addr);
let get_peer_addr = GetPeerAddr {
connect: connect_future,
};
# if false {
tokio::run(get_peer_addr);
# }
}
The implementation of GetPeerAddr
is very similar to the Display
future from
the previous page. The primary difference is, in this case,
self.connect.poll()
will (probably) return Async::NotReady
a number of times
before returning the connected socket. When this happens, our future returns
NotReady
.
GetPeerAddr
contains ConnectFuture
, a future that completes once a TCP
stream has been established. This future is returned by TcpStream::connect
.
When GetPeerAddr
is passed to tokio::run
, Tokio will repeatedly call poll
until Ready
is returned. The exact mechanism by which this happens is
described in later chapters.
When implementing Future
, Async::NotReady
must not be returned unless
Async::NotReady
was obtained when calling poll
on an inner future. One way
to think about it is, when a future is polled, it must do as much work as it can
until it either completes or becomes blocked on an inner future.
Chaining computations
Now, let’s take the connect future and update it to write “hello world” once the TCP socket has been established.
# #![deny(deprecated)]
extern crate tokio;
extern crate bytes;
#[macro_use]
extern crate futures;
use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use bytes::{Bytes, Buf};
use futures::{Future, Async, Poll};
use std::io::{self, Cursor};
// HelloWorld has two states, namely waiting to connect to the socket
// and already connected to the socket
enum HelloWorld {
Connecting(ConnectFuture),
Connected(TcpStream, Cursor<Bytes>),
}
impl Future for HelloWorld {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
use self::HelloWorld::*;
loop {
match self {
Connecting(ref mut f) => {
let socket = try_ready!(f.poll());
let data = Cursor::new(Bytes::from_static(b"hello world"));
*self = Connected(socket, data);
}
Connected(ref mut socket, ref mut data) => {
// Keep trying to write the buffer to the socket as long as the
// buffer has more bytes available for consumption
while data.has_remaining() {
try_ready!(socket.write_buf(data));
}
return Ok(Async::Ready(()));
}
}
}
}
}
fn main() {
let addr = "127.0.0.1:1234".parse().unwrap();
let connect_future = TcpStream::connect(&addr);
let hello_world = HelloWorld::Connecting(connect_future);
# let hello_world = futures::future::ok::<(), io::Error>(());
// Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
tokio::run(hello_world.map_err(|e| println!("{0}", e)))
}
It is very common to implement futures as an enum
of the possible
states. This allows the future implementation to track its state
internally by transitioning between the enum’s variants.
This future is represented as an enumeration of states:
- Connecting
- Writing “hello world” to the socket.
The future starts in the connecting state with an inner future of type
ConnectFuture
. It repeatedly polls this future until the socket is returned.
The state is then transitioned to Connected
.
From the Connected
state, the future writes data to the socket. This is done
with the write_buf
function. I/O functions are covered in more detail in the
next section. Briefly, write_buf
is a non-blocking function to
write data to the socket. If the socket is not ready to accept the write,
NotReady
is returned. If some data (but not necessarily all) was written,
Ready(n)
is returned, where n
is the number of written bytes. The cursor is
also advanced.
Once in the Connected
state, the future must loop as long as there is data
left to write. Because write_buf
is wrapped with try_ready!()
, when
write_buf
returns NotReady
, our poll
function returns with NotReady
.
At some point in the future, our poll
function is called again. Because it is
in the Connected
state, it jumps directly to writing data.
Note the loops are important. Many future implementations contain loops.
These loops are necessary because poll
cannot return until either all the data
is written to the socket, or an inner future (ConnectFuture
or
write_buf
) returns NotReady
.