Using AsyncRead and AsyncWrite directly
So far, we have primarily talked about AsyncRead
and AsyncWrite
in
the context of I/O combinators provided by Tokio. While these are often
enough, sometimes you need to implement your own combinators that
want to perform asynchronous reads and writes directly.
Reading data with AsyncRead
The heart of AsyncRead
is the poll_read
method. It maps the
WouldBlock
error that indicates that an I/O read
operation would
have blocked into NotReady
, which in turn lets us interoperate with
the world of futures. When you write a Future
(or something like it,
such as Stream
) that internally contains an AsyncRead
, poll_read
is likely the method you will be interacting with.
The important thing to keep in mind with poll_read
is that it follows
the same contract as Future::poll
. Specifically, it cannot return
NotReady
unless it has arranged for the current task to be notified
when it can make progress again. This fact is what lets us call
poll_read
inside of poll
in our own Future
s; we know that we are
upholding the contract of poll
when we forward a NotReady
from
poll_read
, because poll_read
follows that same contract!
The exact mechanism Tokio uses to ensure that poll_read
later notifies
the current task is out of scope for this section, but you can read more
about it in the non-blocking I/O section of Tokio internals if you’re
interested.
With that all said, let’s look at how we might implement the
read_exact
method ourselves!
# extern crate tokio;
#[macro_use]
extern crate futures;
# fn main() {}
use std::io;
use tokio::prelude::*;
// This is going to be our Future.
// In the common case, this is set to Some(Reading),
// but we'll set it to None when we return Async::Ready
// so that we can return the reader and the buffer.
struct ReadExact<R, T>(Option<Reading<R, T>>);
struct Reading<R, T> {
// This is the stream we're reading from.
reader: R,
// This is the buffer we're reading into.
buffer: T,
// And this is how far into the buffer we've written.
pos: usize,
}
// We want to be able to construct a ReadExact over anything
// that implements AsyncRead, and any buffer that can be
// thought of as a &mut [u8].
fn read_exact<R, T>(reader: R, buffer: T) -> ReadExact<R, T>
where
R: AsyncRead,
T: AsMut<[u8]>,
{
ReadExact(Some(Reading {
reader,
buffer,
// Initially, we've read no bytes into buffer.
pos: 0,
}))
}
impl<R, T> Future for ReadExact<R, T>
where
R: AsyncRead,
T: AsMut<[u8]>,
{
// When we've filled up the buffer, we want to return both the buffer
// with the data that we read and the reader itself.
type Item = (R, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0 {
Some(Reading {
ref mut reader,
ref mut buffer,
ref mut pos,
}) => {
let buffer = buffer.as_mut();
// Check that we haven't finished
while *pos < buffer.len() {
// Try to read data into the remainder of the buffer.
// Just like read in std::io::Read, poll_read *can* read
// fewer bytes than the length of the buffer it is given,
// and we need to handle that by looking at its return
// value, which is the number of bytes actually read.
//
// Notice that we are using try_ready! here, so if poll_read
// returns NotReady (or an error), we will do the same!
// We uphold the contract that we have arranged to be
// notified later because poll_read follows that same
// contract, and _it_ returned NotReady.
let n = try_ready!(reader.poll_read(&mut buffer[*pos..]));
*pos += n;
// If no bytes were read, but there was no error, this
// generally implies that the reader will provide no more
// data (for example, because the TCP connection was closed
// by the other side).
if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof"));
}
}
}
None => panic!("poll a ReadExact after it's done"),
}
// We need to return the reader and the buffer, which we can only
// do by moving them out of self. We do this by taking our state
// and leaving `None`. This _should_ be fine, because poll()
// requires callers to not call poll() again after Ready has been
// returned, so we should only ever see Some(Reading) when poll()
// is called.
let reading = self.0.take().expect("must have seen Some above");
Ok(Async::Ready((reading.reader, reading.buffer)))
}
}
Writing data with AsyncWrite
Just like poll_read
is the core piece of AsyncRead
, poll_write
is
the core of AsyncWrite
. Like poll_read
, it maps the WouldBlock
error that indicates that an I/O write
operation would have blocked into
NotReady
, which again lets us interoperate with the world of
futures. AsyncWrite
also has a poll_flush
, which provides an
asynchronous analogue to Write
’s flush
method. The role of
poll_flush
is to make sure that any bytes previously written by
poll_write
are, well, flushed onto the underlying I/O resource
(written out in network packets for example). Similar to poll_write
, it
wraps around Write::flush
, and maps a WouldBlock
error into
NotReady
to indicate that the flushing is still ongoing.
AsyncWrite
’s poll_write
and poll_flush
follow the same contract as
Future::poll
and AsyncRead::poll_read
, namely that if they return
NotReady
, they have arranged for the current task to be notified when
they can make progress again. Like with poll_read
, this means that we
can safely call these methods in our own futures, and know that we are
also following the contract.
Tokio uses the same mechanism to manage notifications for poll_write
and poll_flush
as it does for poll_read
, and you can read more about
it in the non-blocking I/O section of Tokio internals.
Shutdown
AsyncWrite
also adds one method that is not part of Write
:
shutdown
. From its documentation:
Initiates or attempts to shut down this writer, returning success when the I/O connection has completely shut down.
This method is intended to be used for asynchronous shutdown of I/O connections. For example this is suitable for implementing shutdown of a TLS connection or calling
TcpStream::shutdown
on a proxied connection. Protocols sometimes need to flush out final pieces of data or otherwise perform a graceful shutdown handshake, reading/writing more data as appropriate. This method is the hook for such protocols to implement the graceful shutdown logic.
This sums shutdown
up pretty nicely: it is a way to tell the writer
that no more data is coming, and that it should indicate in whatever way
the underlying I/O protocol requires. For TCP connections, for example,
this usually entails closing the writing side of the TCP channel so that
the other end receives and end-of-file in the form of a read that
returns 0 bytes. You can often think of shutdown
as what you would
have done synchronously in the implementation of Drop
; it’s just that
in the asynchronous world, you can’t easily do something in Drop
because you need to have an executor that keeps polling your writer!
Note that calling shutdown
on a write “half” of a type that implements
AsyncWrite
and AsyncRead
does not shut down the read “half”. You
can usually still continue reading data as you please until the other
side shuts down their corresponding write “half”.
An example of using AsyncWrite
Without further ado, let’s take a look at how we might implement
# extern crate tokio;
#[macro_use]
extern crate futures;
# fn main() {}
use std::io;
use tokio::prelude::*;
// This is going to be our Future.
// It'll seem awfully familiar to ReadExact above!
// In the common case, this is set to Some(Writing),
// but we'll set it to None when we return Async::Ready
// so that we can return the writer and the buffer.
struct WriteAll<W, T>(Option<Writing<W, T>>);
struct Writing<W, T> {
// This is the stream we're writing into.
writer: W,
// This is the buffer we're writing from.
buffer: T,
// And this is much of the buffer we've written.
pos: usize,
}
// We want to be able to construct a WriteAll over anything
// that implements AsyncWrite, and any buffer that can be
// thought of as a &[u8].
fn write_all<W, T>(writer: W, buffer: T) -> WriteAll<W, T>
where
W: AsyncWrite,
T: AsRef<[u8]>,
{
WriteAll(Some(Writing {
writer,
buffer,
// Initially, we've written none of the bytes from buffer.
pos: 0,
}))
}
impl<W, T> Future for WriteAll<W, T>
where
W: AsyncWrite,
T: AsRef<[u8]>,
{
// When we've written out the entire buffer, we want to return
// both the buffer and the writer so that the user can re-use them.
type Item = (W, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0 {
Some(Writing {
ref mut writer,
ref buffer,
ref mut pos,
}) => {
let buffer = buffer.as_ref();
// Check that we haven't finished
while *pos < buffer.len() {
// Try to write the remainder of the buffer into the writer.
// Just like write in std::io::Write, poll_write *can* write
// fewer bytes than the length of the buffer it is given,
// and we need to handle that by looking at its return
// value, which is the number of bytes actually written.
//
// We are using try_ready! here, just like in poll_read in
// ReadExact, so that if poll_write returns NotReady (or an
// error), we will do the same! We uphold the contract that
// we have arranged to be notified later because poll_write
// follows that same contract, and _it_ returned NotReady.
let n = try_ready!(writer.poll_write(&buffer[*pos..]));
*pos += n;
// If no bytes were written, but there was no error, this
// generally implies that something weird happened under us.
// We make sure to turn this into an error for the caller to
// deal with.
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"zero-length write",
));
}
}
}
None => panic!("poll a WriteAll after it's done"),
}
// We use the same trick as in ReadExact to ensure that we can return
// the buffer and the writer once the entire buffer has been written out.
let writing = self.0.take().expect("must have seen Some above");
Ok(Async::Ready((writing.writer, writing.buffer)))
}
}