Using channels to use Sink in multiple places

As mentioned previously, you can use a channel to send data with Sink from different places and at any time. Look at the following example:

fn alt_udp_echo() -> Result<(), Error> {
let from = "0.0.0.0:12345".parse()?;
let socket = UdpSocket::bind(&from)?;
let framed = UdpFramed::new(socket, LinesCodec::new());
let (sink, stream) = framed.split();
let (tx, rx) = mpsc::channel(16);
let rx = rx.map_err(|_| other("can't take a message"))
.fold(sink, |sink, frame| {
sink.send(frame)
});
let process = stream.and_then(move |args| {
tx.clone()
.send(args)
.map(drop)
.map_err(other)
}).collect();
let execute_all = future::join_all(vec![
to_box(rx),
to_box(process),
]).map(drop);
Ok(tokio::run(execute_all))
}

This example creates a UdpSocket instance that represents a UDP socket and binds it to the 0.0.0.0:12345 address. After that, we wrap a socket with the UdpFramed type, which implements a Stream of data that is generated with the provided codec. We will use LinesCodec from the tokio::codec module. This reads an input and uses a line delimiter to split the data into pieces that represent lines of text.

We will split the framed stream and create a channel to send the UDP datagrams from different places. We will get familiar with the channel module in the next section and learn how tasks can interact with each other asynchronously using the Sender and Receiver objects.

The channel method returns the Sender and Receiver objects. We use the Receiver to forward all incoming messages to a Sink of the UDP connection and we read all data from the stream and send it back with the channel. This echo server can be implemented more effectively without channels, but we have used them here for demonstrative purposes. To send a message, we used a Sender of the created channel. The advantage of this approach is that you can clone and use a sender instance everywhere to send messages to a channel at any time. 

Sometimes, Future and Stream differ with regard to their Item  or  Error type parameters. To counteract this, we add an  other method that wraps any error instance with the io::Error  type. We use this function to convert one error type to another:

fn other<E>(err: E) -> io::Error
where
E: Into<Box<std::error::Error + Send + Sync>>,
{
io::Error::new(io::ErrorKind::Other, err)
}

You can compile this echo server and check how it works using the netcat utility. You should install this if your operating system doesn't contain it already. Type the nc command with the --verbose (short form: -v), --udp (short form: -u), and --no-dns (short form: -n) arguments and enter any text. As an example, we have typed "Text Message":

$ nc -vnu 0.0.0.0 12345
Ncat: Version 7.60 ( https://nmap.org/ncat )
Ncat: Connected to 0.0.0.0:12345.
Text Message
Text Message
^C

As you can see, the server has sent us back the provided string. All these examples used an executor to run the tasks concurrently. Before we start to implement a server, let's learn how executors work.