The blocking section

The tokio-threadpool crate contains a blocking function that's declared as follows:

pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError> where F: FnOnce() -> T 

This function expects any function that performs blocking operations and runs it in a separate thread, providing a Poll result that can be used by a reactor. It is a slightly low-level approach but it's actively used by tokio and other crates (to perform IO operations on files).

The positive side of this approach is that we don't need to create a thread pool manually. We can use the simple main function, as we've done before:

fn main() {
let addr = ([127, 0, 0, 1], 8080).into();
let builder = Server::bind(&addr);
let server = builder.serve(|| service_fn(|req| microservice_handler(req)));
let server = server.map_err(drop);
hyper::rt::run(server);
}

To spawn a task that calls the convert function, we can use the following code:

let body = req.into_body()
.map_err(other)
.concat2()
.map(|chunk| chunk.to_vec())
.and_then(move |buffer| {
future::poll_fn(move || {
let buffer = &buffer;
blocking(move || {
convert(buffer, width, height).unwrap()
})
})
.map_err(other)
})
.map(|resp| Response::new(resp.into()));

The blocking function call delegates the task execution to another thread and returns a Poll for every call until the result of the execution is ready. To call a raw function that returns a Poll result, we can wrap that function with a future::poll_fn function call that converts any polling function to a Future instance. Looks simple, doesn't it? We didn't even create a thread pool manually.

For example, the tokio-fs crate uses this method to implement IO operations on files:

impl Write for File {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
::would_block(|| self.std().write(buf))
}
fn flush(&mut self) -> io::Result<()> {
::would_block(|| self.std().flush())
}
}

would_block is a wrapper over the blocking function:

fn would_block<F, T>(f: F) -> io::Result<T>
where F: FnOnce() -> io::Result<T>,
{
match tokio_threadpool::blocking(f) {
Ok(Ready(Ok(v))) => Ok(v),
Ok(Ready(Err(err))) => {
debug_assert_ne!(err.kind(), WouldBlock);
Err(err)
}
Ok(NotReady) => Err(WouldBlock.into()),
Err(_) => Err(blocking_err()),
}
}

You now know how any blocking operation can be joined with an asynchronous reactor. This approach is used not only to interact with filesystems, but also for databases and other crates that don't support the futures crate or that need massive calculations with CPU.