Actor

Add a new struct called QueueActor and add a type parameter that implements the QueueHandler trait:

pub struct QueueActor<T: QueueHandler> {
channel: Channel<TcpStream>,
handler: T,
}

The struct has a reference to a connection Channel to RabbitMQ. We build it over TcpStream. The struct also has a handler field that contains an instance of a handler that implements QueueHandler.

This struct also has to implement the Actor trait to become an actor. We also added a started method. It remains empty, but it's a good place to create all the queues. For example, you can create a message type that will attach a Stream of messages to this actor. With this approach, you can start consuming any queue at any time:

impl<T: QueueHandler> Actor for QueueActor<T> {
type Context = Context<Self>;

fn started(&mut self, _: &mut Self::Context) {}
}

We will initialize all the queues in the new method to interrupt actor creation if something goes wrong:

impl<T: QueueHandler> QueueActor<T> {
pub fn new(handler: T, mut sys: &mut SystemRunner) -> Result<Addr<Self>, Error> {
let channel = spawn_client(&mut sys)?;
let chan = channel.clone();
let fut = ensure_queue(&chan, handler.outgoing());
sys.block_on(fut)?;
let fut = ensure_queue(&chan, handler.incoming()).and_then(move |queue| {
let opts = BasicConsumeOptions {
..Default::default()
};
let table = FieldTable::new();
let name = format!("{}-consumer", queue.name());
chan.basic_consume(&queue, &name, opts, table)
});
let stream = sys.block_on(fut)?;
let addr = QueueActor::create(move |ctx| {
ctx.add_stream(stream);
Self { channel, handler }
});
Ok(addr)
}
}

We call the spawn_client function, which we will implement later, to create a Client that's connected to a message broker. The function returns a Channel instance, which is created by the connected Client. We use Channel to ensure the queue we need exists, or create it with ensure_queue. This method is implemented later in this chapter. We use the result of the QueueHandler::outgoing method to get the name of the queue to create.

This method expects SystemRunner to execute Future objects immediately by calling the block_on method. It lets us get a Result and interrupts other activities if the method call fails.

After that, we create a queue using the name we get with the QueueHandler::incoming method call. We will consume messages from this queue and use the basic_consume method of a Channel that starts listening for new messages. To call basic_consume, we also created default values of the BasicConsumeOptions and FieldTable types. basic_consume returns a Future that will be resolved to a Stream value. We use the block_on method call of the SystemRunner instance to execute this Future to get a Stream instance to attach it to QueueActor. We create the QueueActor instance using the create method call, which expects a closure, which in turn takes a reference to a Context.