Playing music

First, add the mp3 module:

mod mp3;

Copy the src/mp3.rs file from the previous chapter.

We also need the following dependencies:

[dependencies]
crossbeam = "^0.3.0"
futures = "^0.1.16"
pulse-simple = "^1.0.0"
simplemad = "^0.8.1"

And add these statements to the main module:

extern crate crossbeam;
extern crate futures;
extern crate pulse_simple;
extern crate simplemad;

We'll now add a player module:

mod player;

This new module will start with a bunch of import statements:

use std::cell::Cell;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

use crossbeam::sync::SegQueue;
use futures::{AsyncSink, Sink};
use futures::sync::mpsc::UnboundedSender;
use pulse_simple::Playback;

use mp3::Mp3Decoder;
use playlist::PlayerMsg::{
    self,
    PlayerPlay,
    PlayerStop,
    PlayerTime,
};
use self::Action::*;

We imported a new PlayerMsg type from the playlist module, so let's add it:

#[derive(Clone)]
pub enum PlayerMsg {
    PlayerPlay,
    PlayerStop,
    PlayerTime(u64),
}

We'll define some constants:

const BUFFER_SIZE: usize = 1000;
const DEFAULT_RATE: u32 = 44100;

And let's create the types that we'll need:

enum Action {
    Load(PathBuf),
    Stop,
}

#[derive(Clone)]
struct EventLoop {
    condition_variable: Arc<(Mutex<bool>, Condvar)>,
    queue: Arc<SegQueue<Action>>,
    playing: Arc<Mutex<bool>>,
}

pub struct Player {
    event_loop: EventLoop,
    paused: Cell<bool>,
    tx: UnboundedSender<PlayerMsg>,
}

The Action and EventLoop are the same as in the previous chapter, but the Player type is a bit different. Instead of having a field with the state of the application, it contains a sender that will be used to send messages to the playlist and ultimately to the application itself. So, instead of using a shared state and a timeout like we did in the previous chapter, we'll use message passing, which is more efficient.

We'll need a constructor for EventLoop:

impl EventLoop {
    fn new() -> Self {
        EventLoop {
            condition_variable: Arc::new((Mutex::new(false), Condvar::new())),
            queue: Arc::new(SegQueue::new()),
            playing: Arc::new(Mutex::new(false)),
        }
    }
}

Let's create the constructor for Player:

impl Player {
    pub(crate) fn new(tx: UnboundedSender<PlayerMsg>) -> Self {
        let event_loop = EventLoop::new();

        {
            let mut tx = tx.clone();
            let event_loop = event_loop.clone();
            let condition_variable = event_loop.condition_variable.clone();
            thread::spawn(move || {
                let block = || {
                    let (ref lock, ref condition_variable) = *condition_variable;
                    let mut started = lock.lock().unwrap();
                    *started = false;
                    while !*started {
                        started = condition_variable.wait(started).unwrap();
                    }
                };

                let mut buffer = [[0; 2]; BUFFER_SIZE];
                let mut playback = Playback::new("MP3", "MP3 Playback", None,  
DEFAULT_RATE); let mut source = None; loop { if let Some(action) = event_loop.queue.try_pop() { match action { Load(path) => { let file = File::open(path).unwrap(); source =
Some(Mp3Decoder::new(BufReader::new(file)).unwrap()); let rate = source.as_ref().map(|source|
source.samples_rate()).unwrap_or(DEFAULT_RATE); playback = Playback::new("MP3", "MP3 Playback",
None, rate); send(&mut tx, PlayerPlay); }, Stop => { source = None; }, } } else if *event_loop.playing.lock().unwrap() { let mut written = false; if let Some(ref mut source) = source { let size = iter_to_buffer(source, &mut buffer); if size > 0 { send(&mut tx, PlayerTime(source.current_time())); playback.write(&buffer[..size]); written = true; } } if !written { send(&mut tx, PlayerStop); *event_loop.playing.lock().unwrap() = false; source = None; block(); } } else { block(); } } }); } Player { event_loop, paused: Cell::new(false), tx, } } }

It is similar to the one we wrote in the previous chapter, but instead of using the shared state, we send messages back to the playlist. Here's an example of how we send these messages:

send(&mut tx, PlayerTime(source.current_time()));

This sends the current time back to the UI so that it can display it. This requires the send() function to be defined:

fn send(tx: &mut UnboundedSender<PlayerMsg>, msg: PlayerMsg) {
    if let Ok(AsyncSink::Ready) = tx.start_send(msg) {
        tx.poll_complete().unwrap();
    } else {
        eprintln!("Unable to send message to sender");
    }
}

This code uses the future crate to send the message and it shows an error in case it fails. The iter_to_buffer() function is the same as the one from the previous chapter:

fn iter_to_buffer<I: Iterator<Item=i16>>(iter: &mut I, buffer: &mut [[i16; 2]; BUFFER_SIZE]) -> usize {
    let mut iter = iter.take(BUFFER_SIZE);
    let mut index = 0;
    while let Some(sample1) = iter.next() {
        if let Some(sample2) = iter.next() {
            buffer[index][0] = sample1;
            buffer[index][1] = sample2;
        }
        index += 1;
    }
    index
}

We'll now add the methods to play and pause a song:

pub fn load<P: AsRef<Path>>(&self, path: P) {
    let pathbuf = path.as_ref().to_path_buf();
    self.emit(Load(pathbuf));
    self.set_playing(true);
}

pub fn pause(&mut self) {
    self.paused.set(true);
    self.send(PlayerStop);
    self.set_playing(false);
}

pub fn resume(&mut self) {
    self.paused.set(false);
    self.send(PlayerPlay);
    self.set_playing(true);
}

They're very similar to the ones from the previous chapter, but we send a message instead of modifying a state. They require the following methods:

fn emit(&self, action: Action) {
    self.event_loop.queue.push(action);
}

fn send(&mut self, msg: PlayerMsg) {
    send(&mut self.tx, msg);
}

fn set_playing(&self, playing: bool) {
    *self.event_loop.playing.lock().unwrap() = playing;
    let (ref lock, ref condition_variable) = *self.event_loop.condition_variable;
    let mut started = lock.lock().unwrap();
    *started = playing;
    if playing {
        condition_variable.notify_one();
    }
}

The emit() and set_playing() methods are the same as in the previous chapter. The send() method simply calls the send() function we defined earlier.

We'll also need these two methods:

pub fn is_paused(&self) -> bool {
    self.paused.get()
}

pub fn stop(&mut self) {
    self.paused.set(false);
    self.send(PlayerTime(0));
    self.send(PlayerStop);
    self.emit(Stop);
    self.set_playing(false);
}

The is_paused() method has not changed. And the stop() method is similar, but again, it sends messages instead of updating the application state directly. Let's go back to our Paylist to use this new player. The model will now contain the player itself:

use player::Player;

pub struct Model {
    current_song: Option<String>,
    player: Player,
    model: ListStore,
    relm: Relm<Playlist>,
}

The Msg type will contain a new variant called PlayerMsgRecv that will be emitted whenever the player sends a message:

#[derive(Msg)]
pub enum Msg {
    AddSong(PathBuf),
    LoadSong(PathBuf),
    NextSong,
    PauseSong,
    PlayerMsgRecv(PlayerMsg),
    PlaySong,
    PreviousSong,
    RemoveSong,
    SaveSong(PathBuf),
    SongStarted(Option<Pixbuf>),
    StopSong,
}

We're now ready to update the model initialization:

use futures::sync::mpsc;

fn model(relm: &Relm<Self>, _: ()) -> Model {
    let (tx, rx) = mpsc::unbounded();
    relm.connect_exec_ignore_err(rx, PlayerMsgRecv);
    Model {
        current_song: None,
        player: Player::new(tx),
        model: ListStore::new(&[
            Pixbuf::static_type(),
            Type::String,
            Type::String,
            Type::String,
            Type::String,
            Type::String,
            Type::String,
            Type::String,
            Pixbuf::static_type(),
        ]),
        relm: relm.clone(),
    }
}

It now creates a sender and receiver pair from the mpsc type of the future crate. MPSC stands for Multiple-Producers-Single-Consumer. We now call the Relm::connect_exec_ignore_err() method, this method connects a Future or a Stream to a message. This means that whenever a value is produced in the Stream, a message will be emitted. The message needs to take a parameter of the same type as the value produced by the Stream. A Future represents a value that is possibly not yet available, but will be available in the future, unless an error happens. A Stream is similar, but can produce multiple values that will be available at different times in the future. Similar to the connect_exec_ignore_err() method, there's also the connect_exec() method, which takes another message variant as a parameter, this second message will be emitted when there's an error. Here, we simply ignore the errors.

In the update() method:

fn update(&mut self, event: Msg) {
    match event {
        // To be listened by App.
        PlayerMsgRecv(_) => (),
        // …
    }
}

We have nothing to do with this message because it will be handled by the App widget. We'll now add a method to pause the player:

fn pause(&mut self) {
    self.model.player.pause();
}

Next we need to update the play() and stop() methods:

fn play(&mut self) {
    if let Some(path) = self.selected_path() {
        if self.model.player.is_paused() && Some(&path) == self.path().as_ref() {
            self.model.player.resume();
        } else {
            self.model.player.load(&path);
            self.model.current_song = Some(path.into());
            self.model.relm.stream().emit(SongStarted(self.pixbuf()));
        }
    }
}

fn stop(&mut self) {
    self.model.current_song = None;
    self.model.player.stop();
}

The stop() method is the same, except that we can update the model directly, because we don't need to use the RefCell type anymore. The play() method will now load or resume the song depending on the state of the player.

The play() method requires a path() method:

fn path(&self) -> Option<String> {
    self.model.current_song.clone()
}

Let's go back to the main module to manage the messages sent by the player. First, we need a new variant for our enum Msg:

#[derive(Msg)]
pub enum Msg {
    MsgRecv(PlayerMsg),
    // …
}

We will handle this in the update() method:

fn update(&mut self, event: Msg) {
    match event {
        MsgRecv(player_msg) => self.player_message(player_msg),
        // …
    }
}

This requires a new method to be added in impl Widget for App:

#[widget]
impl Widget for App {
    fn player_message(&mut self, player_msg: PlayerMsg) {
        match player_msg {
            PlayerPlay => {
                self.model.stopped = false;
                self.set_play_icon(PAUSE_ICON);
            },
            PlayerStop => {
                self.set_play_icon(PLAY_ICON);
                self.model.stopped = true;
            },
            PlayerTime(time) => self.set_current_time(time),
        }
    }
}

This is also a custom method, that is, a method that is not part of the Widget trait, but is analyzed by the #[widget] attribute. We put it there instead of a separate impl App because we updated the model. In this method, we either update the icon to display the play button or the current time.