I/O with Tokio
The tokio
crate comes with TCP and UDP networking types. Unlike the types in
std
, Tokio’s networking types are based on the poll model and will notify the
task executors when their readiness states change (data is received and write
buffers are flushed). In the tokio::net
module you’ll find types like
TcpListener
, TcpStream
, and UdpSocket
.
All of these types provide both a future API as well as a poll API.
The Tokio net types are powered by a Mio based reactor that, by default, is started up lazily on a background thread. See reactor documentation for more details.
Using the Future API
We’ve already seen some of this earlier in the guide with the incoming
function as well as the helpers found in tokio_io::io
.
These helpers include:
incoming
: A stream of inbound TCP connections.read_exact
: Read exactlyn
bytes into a buffer.read_to_end
: Read all bytes into a buffer.write_all
: Write the entire contents of a buffer.copy
: Copy bytes from one I/O handle to another.
A lot of these functions / helpers are generic over the AsyncRead
and
AsyncWrite
traits. These traits are similar to Read
and Write
from
std
, but are only for types that are “future aware”, i.e. follow the
mandated properties:
- Calls to
read
orwrite
are nonblocking, they never block the calling thread. - If a call would otherwise block then the function returns a value indicating so. If this happens then the current future’s task is scheduled to receive a notification when the I/O is ready again.
Note that users of AsyncRead
and AsyncWrite
types should use
poll_read
and poll_write
instead of directly calling read
and write
.
For example, here is how to accept connections, read 5 bytes from them, then write the 5 bytes back to the socket:
# #![deny(deprecated)]
# extern crate tokio;
#
# use tokio::io;
# use tokio::net::TcpListener;
# use tokio::prelude::*;
# fn main() {
# let addr = "127.0.0.1:6142".parse().unwrap();
# let listener = TcpListener::bind(&addr).unwrap();
let server = listener.incoming().for_each(|socket| {
println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
let buf = vec![0; 5];
let connection = io::read_exact(socket, buf)
.and_then(|(socket, buf)| {
io::write_all(socket, buf)
})
.then(|_| Ok(())); // Just discard the socket and buffer
// Spawn a new task that processes the socket:
tokio::spawn(connection);
Ok(())
})
# ;
# }
Using the Poll API
The Poll based API is to be used when implementing Future
by hand and you need
to return Async
. This is useful when you need to implement your own
combinators that handle custom logic.
For example, this is how the read_exact
future could be implemented for a
TcpStream
.
# #![deny(deprecated)]
# extern crate tokio;
# #[macro_use]
# extern crate futures;
# use tokio::io;
# use tokio::prelude::*;
#
# use tokio::net::TcpStream;
# use std::mem;
pub struct ReadExact {
state: State,
}
enum State {
Reading {
stream: TcpStream,
buf: Vec<u8>,
pos: usize,
},
Empty,
}
impl Future for ReadExact {
type Item = (TcpStream, Vec<u8>);
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> {
match self.state {
State::Reading {
ref mut stream,
ref mut buf,
ref mut pos
} => {
while *pos < buf.len() {
let n = try_ready!({
stream.poll_read(&mut buf[*pos..])
});
*pos += n;
if n == 0 {
let err = io::Error::new(
io::ErrorKind::UnexpectedEof,
"early eof");
return Err(err)
}
}
}
State::Empty => panic!("poll a ReadExact after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { stream, buf, .. } => {
Ok(Async::Ready((stream, buf)))
}
State::Empty => panic!(),
}
}
}
# pub fn main() {}
Datagrams
Note that most of this discussion has been around I/O or byte streams, which
UDP importantly is not! To accommodate this, however, the UdpSocket
type
also provides a number of methods for working with it conveniently:
send_dgram
allows you to express sending a datagram as a future, returning an error if the entire datagram couldn’t be sent at once.recv_dgram
expresses reading a datagram into a buffer, yielding both the buffer and the address it came from.