Fetching data in parallel

To understand the issues outlined in the previous section better, let's build a more complex example that fetches data about one of my favorite movies, The Lord of the Rings.

The idea is that, given the movie, we wish to retrieve its actors and, for each actor, retrieve the movies they have been a part of. We would also like to find out more information about each actor, such as their spouses.

Additionally, we will match each actor's movies against the list of top five movies to highlight them as such. Finally, the result will be printed to the screen.

From the problem statement, we can identify the following two main characteristics we will need to account for:

To get started, let's create a new Leiningen project:

    lein new clj-futures-playground  

Next, open the core namespace file in src/clj_futures_playground/core.clj and add the data we will be working with:

(ns clj-futures-playground.core 
  (:require [clojure.pprint :refer [pprint]])) 
 
(def movie 
  {:name "Lord of The Rings: The Fellowship of The Ring" 
   :cast ["Cate Blanchett" 
          "Elijah Wood" 
          "Liv Tyler" 
          "Orlando Bloom"]}) 
 
(def actor-movies 
  [{:name "Cate Blanchett" 
    :movies ["Lord of The Rings: The Fellowship of The Ring" 
             "Lord of The Rings: The Return of The King" 
             "The Curious Case of Benjamin Button"]} 
 
   {:name "Elijah Wood" 
    :movies ["Eternal Sunshine of the Spotless Mind" 
             "Green Street Hooligans" 
             "The Hobbit: An Unexpected Journey"]} 
 
   {:name "Liv Tyler" 
    :movies ["Lord of The Rings: The Fellowship of The Ring" 
             "Lord of The Rings: The Return of The King" 
             "Armageddon"]} 
 
   {:name "Orlando Bloom" 
    :movies ["Lord of The Rings: The Fellowship of The Ring" 
             "Lord of The Rings: The Return of The King" 
             "Pirates of the Caribbean: The Curse of the Black Pearl"]}]) 
 
(def actor-spouse 
  [{:name "Cate Blanchett"    :spouse "Andrew Upton"} 
   {:name "Elijah Wood"       :spouse "Unknown"} 
   {:name "Liv Tyler"         :spouse "Royston Langdon"} 
   {:name "Orlando Bloom"     :spouse "Miranda Kerr"}]) 
(def top-5-movies 
  ["Lord of The Rings: The Fellowship of The Ring" 
   "The Matrix" 
   "The Matrix Reloaded" 
   "Pirates of the Caribbean: The Curse of the Black Pearl" 
   "Terminator"]) 

The namespace declaration is simple and only requires the pprint function, which will help us print our result in an easy-to-read format. With all the data in place, we can create the functions that will simulate remote services that are responsible for fetching the relevant data:

(defn cast-by-movie [name] 
  (future (do (Thread/sleep 5000) 
              (:cast  movie)))) 
 
(defn movies-by-actor [name] 
  (do (Thread/sleep 2000) 
      (->> actor-movies 
           (filter #(= name (:name %))) 
           first))) 
 
(defn spouse-of [name] 
  (do (Thread/sleep 2000) 
      (->> actor-spouse 
           (filter #(= name (:name %))) 
           first))) 
 
(defn top-5 [] 
  (future (do (Thread/sleep 5000) 
              top-5-movies))) 

Each service function sleeps the current thread by a given amount of time to simulate a slow network. The cast-by-movie and top 5 functions each return a future, indicating that we wish to fetch this data on a different thread. The remaining functions simply return the actual data. They will also be executed in a different thread, as we will see shortly.

The next thing we need is a function to aggregate all fetched data, match spouses to actors, and highlight movies in the top-5 list. We'll call it the aggregate-actor-data function:

(defn aggregate-actor-data [spouses movies top-5] 
  (map (fn [{:keys [name spouse]} {:keys [movies]}] 
         {:name   name 
          :spouse spouse 
          :movies (map (fn [m] 
                         (if (some #{m} top-5) 
                           (str m " - (top 5)") 
                           m)) 
                       movies)}) 
       spouses 
       movies)) 

The preceding function is fairly straightforward. It simply zips spouses and movies together, building a map of keys, that is, :name, :spouse, and :movies. It further transforms movies to append the Top 5 suffix to the ones in the top-5 list.

The last piece of the puzzle is the -main function, which allows us to run the program from the command line:

(defn -main [& args]
(time (let [cast (cast-by-movie "Lord of The Rings: The Fellowship of The Ring")
movies (pmap movies-by-actor @cast)
spouses (pmap spouse-of @cast)
top-5 (top-5)]
(prn "Fetching data...")
(pprint (aggregate-actor-data spouses movies @top-5)) (shutdown-agents))))

There are a number of things worth highlighting in the preceding snippet.

  1. First, we wrapped the whole body in a call to time, a simple benchmarking function that comes with Clojure. This was just so we know how long the program took to fetch all data—this information will become relevant later.
  2. Then, we set up a number of let bindings. The first, cast, is the result of calling cast-by-movie, which returns a future.
  3. The next binding, movies, used a function we haven't seen before: pmap.
  4. The pmap function works like map, except that the function is mapped over the items in the list in parallel. The pmap function uses futures under the covers, and that is the reason movies-by-actor doesn't return a future—it leaves that for pmap to handle.
The pmap function is actually meant for CPU-bound operations, but is used here to keep the code simple. In the face of blocking IO, pmap wouldn't perform optimally. We will talk more about blocking IO later in this chapter.

We got the list of actors by derefing the cast binding, which, as we saw in the previous section, blocks the current thread waiting for the asynchronous fetch to finish. Once all of the results are ready, we simply call the aggregate-actor-data function.

  1. Lastly, we called the shutdown-agents function, which shuts down the thread pool backing futures in Clojure. This was necessary for our program to terminate properly, otherwise, it would simply hang in the Terminal.

To run the program, type the following into the Terminal, under the project's root directory:

    lein run -m clj-futures-playground.core
    
    "Fetching data..."
    ({:name "Cate Blanchett",
      :spouse "Andrew Upton",
      :movies
      ("Lord of The Rings: The Fellowship of The Ring - (top 5)"
       "Lord of The Rings: The Return of The King"
       "The Curious Case of Benjamin Button")}
     {:name "Elijah Wood",
      :spouse "Unknown",
      :movies
      ("Eternal Sunshine of the Spotless Mind"
       "Green Street Hooligans"
       "The Hobbit: An Unexpected Journey")}
     {:name "Liv Tyler",
      :spouse "Royston Langdon",
      :movies
      ("Lord of The Rings: The Fellowship of The Ring - (top 5)"
       "Lord of The Rings: The Return of The King"
       "Armageddon")}
     {:name "Orlando Bloom",
      :spouse "Miranda Kerr",
      :movies
      ("Lord of The Rings: The Fellowship of The Ring - (top 5)"
       "Lord of The Rings: The Return of The King"
       "Pirates of the Caribbean: The Curse of the Black Pearl - (top 5)")})
    "Elapsed time: 10120.267 msecs"
  

You would have noticed that the program takes a while to print the first message. Additionally, because futures block when they are derefed, the program doesn't start fetching the list of top five movies until it has completely finished fetching the cast of The Lord of the Rings.

Let's have a look at why that is so:

  (time (let [cast    (cast-by-movie "Lord of The Rings: The Fellowship of The Ring") 
              ;; the following line blocks 
              movies  (pmap movies-by-actor @cast) 
              spouses (pmap spouse-of @cast) 
              top-5   (top-5)] 

The highlighted section in the preceding snippet shows where the program blocks, are waiting for cast-by-movie to finish. As we stated previously, Clojure futures don't give us a way to run a piece of code when the future finishes—such as a callback—forcing us to block too soon.

This prevents top-5—a completely independent parallel data fetch—from running before we retrieve the movie's cast.

Of course, this is a contrived example, and we could solve this particular annoyance by calling top-5 before anything else. The problem is that the solution isn't always crystal clear and, ideally, we should not have to worry about the order of execution.

As we will see in the next section, there is a better way to do this.