Scaling it Up
So far, we’ve been using the 100,000 ratings data set for MovieLens as we illustrate different recommender systems. That’s good enough for educational purposes, but if you want to build a recommender system for a real company, you will probably be working with much more data – and you certainly won’t be processing it just on your own desktop PC.
This next section is about scaling it up – systems that exist to let you train recommender systems with real “big data”, on a cluster, maybe even in the cloud.
There are lots of options, but we’ll cover a few of my favorites .
Apache Spark and MLLib
You may have heard of Apache Spark – it’s a very powerful framework for processing massive data sets across a cluster of computers. As your data grows, you just keep adding more computers to your cluster – the sky’s pretty much the limit!
I’m not going to go into the details of how Spark works here, but Sundog Education offers entire courses on that topic. For now, we’ll just focus on it from a practical standpoint – we’ll get it installed, and examine some code to generate recommendations using Spark on a larger data set. Since you probably don’t have a cluster of your own handy, we’ll do still do this on your own desktop PC – but running this code on a cluster running Apache Spark would work exactly the same way. Think of it as running on a cluster of one computer for now, which is your PC.
If you want to follow along with your own PC in this section, first you need to get Spark installed. And on Windows, that’s a bit of a hassle – but here are the steps you need to follow, whatever OS you’re using. If you’re not comfortable with setting environment variables on your OS, you should probably just watch the next few videos because you’ll probably run into trouble otherwise. You need to know what you’re doing with environment variables for this to all work properly, and there are a lot of little frustrating things that can go wrong.
The first step is to install the Java 8 SDK from Oracle’s website – and make sure it is Java 8, not Java 9 or newer! Spark only works with Java 8 right now. When you install it, you need to be sure to install it to a path that does not include any spaces – so on Windows, that means you can’t accept the default location under “Program Files” because there is a space between “Program” and “Files.” Use a new path, like c:\jdk or something, that’s easy to remember and has no spaces in it.
Then you need to set the JAVA_HOME environment variable on your system to the path you installed the Java 8 JDK into. In Windows, environment variables are set using the System control panel – from there, you select advanced system settings, and then you’ll see a button to go into your environment variables.
These next steps are specific to Windows – since Spark uses the Hadoop Distributed File System for some things, it won’t run unless it thinks Hadoop is installed on your PC. We can fool it by copying the winutils.exe file provided with your course materials in the “ScalingUp” folder into c:\winutils\bin, and then setting the HADOOP_HOME environment variable to c:\winutils.
At this point you should restart your PC to make sure all of those new environment variables have taken effect.
Finally, you need to install the pyspark package into your RecSys environment using Anaconda Navigator.
One more thing – if you already have Spark installed and the SPARK_HOME environment variable set, you need to make sure that you have the same version of Spark installed that the pyspark package installed into Anaconda. Usually this is whatever the latest version is. If you have different Spark versions installed on your machine, you’ll get weird errors about the “Java gateway process exiting before sending its port number” that make no sense.
Once you’ve completed these steps, we can move on and start playing with Apache Spark!
The reason Spark is a big deal is because it’s very efficient at distributing the processing of massive data sets across a cluster, and in a reliable manner.
Architecturally it looks like this – you write a driver script, in either Python, Scala, or Java, that defines how you want to process your data, using the API’s Spark provides.
Once you launch that driver script, which is usually from the master node of your cluster, it will communicate with your cluster manager to allocate the resources it needs from your cluster. That cluster manager could be Hadoop’s YARN cluster manager if you’re running Spark on top of a Hadoop cluster, or Spark has its own cluster manager as well if you want to use it instead.
That cluster manager allocates a bunch of executor processes across your cluster that will do the actual work of processing your data. Spark handles all the details of figuring out how to divide your data up and process it in the most efficient manner. It does this by keeping things in memory as much as possible, trying to process data on the actual nodes it is stored on, and using something called a directed acyclic graph to organize the processing in the most efficient way it can.
All of these different components can talk to each other when necessary, which might become necessary if a machine in your cluster goes down or something. It’s all rather resilient. For example, even though the cluster manager looks like a single point of failure in this simplified diagram, you can set things up so you have a backup cluster manager ready to take over if necessary.
From a software developer standpoint, Spark consists of a core that manages all the distribution of work that we talked about, and there are other libraries on top of Spark that you generally work with. Spark SQL for example defines something called a DataSet, and this allows you to work with Spark in much the same way as you would with a SQL database. This has become the standard way of using Spark now – DataSets are the future. Spark Streaming allows you to ingest data in real time and process it as it comes in, and it also offers something called Structured Streaming that even lets you treat that real-time data as SQL data. GraphX exists for analyzing and processing data organized in a graph data structure, such as social networks.
What we’re most interested in, however, is MLLib – that’s Spark’s machine learning library. As we’ll see, it contains classes that make recommendation generation from massive data sets really simple.
Before we dive into some code, there is one concept you’ll need – the RDD. RDD stands for resilient distributed dataset, and it’s at the core of Spark if you’re not using its SQL interfaces instead.
An RDD is an object that encapsulates the data you want to process. When you write a Spark driver script, what you’re really doing is defining operations on RDD’s that all tie together somehow, and ultimately lead to some sort of desired output at the end. You define where the RDD should load its data from, what operations and aggregations it should perform on that data, and where to write its output when it’s done. Kind of like with Tensorflow, nothing actually happens until you kick things off and request that final output – you’re really just allowing Spark to build up a graph of the operations it needs to do to fulfill what you want, and then you start that graph.
From a programming standpoint, this is really nice because you only think about RDD’s and the operations you’re applying to all of the data within them. The RDD hides all of Spark’s complexity in distributing those operations across a cluster, and in handling node failures and things like that. You don’t worry about the distributed computing part, you just worry about how you want to transform your data.
Of course in the world of technology, nothing can stay the same for long – and this is true of Spark as well.
Spark has introduced alternative API’s over time that build upon the RDD. So, while you can still code against RDD’s if you want lower-level control over what your driver script does, DataFrames were introduced that treat the underlying data as Row objects that allow you to code against it like you would a SQL database. You can convert an RDD into a DataFrame if you want to .
Spark 2 introduced the DataSet, which is like a DataFrame except it has better type safety – so you can catch more errors in your driver script at compile time instead of at runtime. Spark 2 uses the DataSet as a common language between all of its components, which also helps to simplify development. You can also convert a DataFrame into a DataSet and vice versa – technically, a DataFrame is exactly the same thing as a DataSet of Row objects. But, there’s a catch – they don’t work in R or Python, so if you’re developing in Python you have to stick with DataFrame objects instead as Spark’s common component. DataSets are also a little bit slower, so some people choose to stick with DataFrames even outside of Python.
This is all kind of confusing, so why am I telling you this? Well, our example in Python will start off using the RDD interface for the initial cleanup and structuring of our input ratings data. But then, we have to convert it to a DataFrame because that’s what Spark’s machine learning library expects as input. DataSets aren’t relevant to us because we’re coding in Python, but it’s a term you’ll hear a lot when people talk about Spark.
So with all that out of the way, let’s take a look at a real Spark driver script and create some real recommendations with it .
Coding Activity
So, open up Spyder inside your RecSys environment, and open the SparkALS.py file in the ScalingUp folder of your course materials.
This is adapted from one of the examples that comes with Apache Spark – they actually use MovieLens as an example as well. But, I made some modifications to get it to work with our specific dataset and to generate top-N recommendations at the end.
It’s surprisingly small, right? One reason Spark is so fun to work with is because you can do really powerful things with very little code. Let’s dive in.
First, we start by importing the packages we need from pyspark itself. As we mentioned, modern Spark scripts use Spark SQL as their primary interface, and that means we need to set up something called a SparkSession for our driver script. It’s similar to a database session in spirit, but we’re not going to use it as a database .
We’re also going to import RegressionEvaluator which will let us measure RMSE on our results, and most importantly ALS from pyspark.ml.recommendation. This allows us to perform matrix factorization using ALS, alternating least squares, as its optimization function. But the amazing thing is that it’s written such that the work can be distributed across a cluster without us even having to think about how it works!
We also import our MovieLens class, just as a convenience to let us print out movie names from movie ID’s.
Now we get into the driver script itself. First, we create a SparkSession object – this contains all of the Spark commands we want to issue. All we’re doing is giving it a name, and letting everything else go with the default settings for the session. But, you could have finer grain control of how Spark distributes the work if you wanted to here.
Next we load up our ratings data into Spark using the read function on our SparkSession, which has handy csv parsing abilities built in. We then call rdd on that, to retrieve a resilient distributed dataset that contains all of that data we can operate on. We’ll store that rdd in a variable called “lines”, since at this point it contains the raw inputs read in directly from ratings.csv.
It’s important to remember that at this point, lines isn’t really a structure that contains all of our data. That would defeat the purpose if we had to read and store everything on a single PC. Creating the lines rdd just creates a placeholder for reading that data in; spark.read didn’t really read in the data right there, it just told the rdd where it will read the data from – but that reading may happen from many different machines in parallel, each reading in their own little piece of the input data that they want to process .
Next, we’ll transform the raw data into something more structured. We call the map function on our lines RDD to transform each individual line using the lambda function provided. This lambda function converts each line into a Row object, that consists of four columns named userId, movieId, rating, and timestamp.
We did this because all modern Spark code operates on DataSets, not RDD’s – so we want to convert our RDD into a DataSet. One form of DataSet is a DataFrame, which consists of Row objects. It’s a little confusing, but all that’s happening here really is that we’re converting our data into a form that Spark’s ML library can work with – and it wants a DataFrame, not a RDD.
Now that we have our dataframe called ratings, we can split it into a training set and a test set using the randomSplit operation. Again, remember nothing is actually happening yet when this line executes – we’re just defining what we want Spark to do, all in a distributed manner, once we request our results.
Now we train our ALS model across a cluster with a whopping two lines of code! As you can see there are a few hyperparameters you can fiddle with if you want to try and improve the results, and you just need to pass in the column names that correspond to the users, items, and ratings. Then it’s just a matter of calling fit on it with our training dataframe.
To generate predictions from this trained model, we use the transform function with our test set, and the RegressionEvaluator class lets us easily compute RMSE from the test set by comparing the predicted ratings to the actual ones. We retrieve the RMSE score from it and print it out. As we’re finally asking for some output that the driver script needs to retrieve, this is the point where Spark will actually kick off the actual processing now that it can optimize the operations it needs to generate that output.
As a sanity check, we also obtain top-N recommendations since RMSE doesn’t tell the whole story. The recommendForAllUsers will generate top-N recommendations for any value of N we want, for everyone – again, in a distributed manner.
We’ll extract the results for our good friend, user number 85, using the filter function. And then we use collect() to retrieve those final results that we’re interested in from the cluster and back to our driver script, where we can then print them out.
Let’s run it and see what happens! If you installed Spark as instructed, it should work – but there’s a lot that can go wrong, so don’t sweat it too much if you just have to watch my own results.
OK, so the RMSE score is 1.12, which isn’t that great – some hyperparameter tuning may be in order, or the data set may just be too small. I’ve tinkered with Spark’s ALS a lot in the past and I suspect it’s the latter. The top N recommendations we got back look pretty weird as well, so maybe this isn’t the best algorithm to use for this data. But the important point is that it could be distributed – even though we’re running this on a single PC, you could run this same exact script on a cluster and Spark would distribute it all for you automatically. Even on a single PC, it can use the different cores of your CPU to do parallel processing if it’s beneficial.
So even if the results aren’t too exciting – the capability we’ve uncovered here is!
Let’s see if we can take it farther.
We promised we’d scale things up in this section – so let’s see what Apache Spark can do, even on a single PC. We’re going to straight from the 100,000 ratings data set we’ve been using so far, to the 20 million ratings dataset from MovieLens! Let’s see if Spark can handle that.
MovieLens’s license terms don’t allow me to redistribute their data, so you’ll have to head over to grouplens.org, select the datasets page, and download the ml-20m.zip file from the 20 million ratings data set there. Once it’s downloaded, uncompress it, and place the ml-20m folder inside your course materials folder.
Now, let’s go back to Spyder, and open up the SparkALS-20m.py file.
There are only a couple of things we changed here. First, I wrote a new loadMovieNames function instead of relying on our MovieLens module, because the movie ID’s in the 20 million dataset are different from the ones in the 100K data set. This is really just copied and pasted from the code in our MovieLens module that did the same thing for the 100K dataset, just using a different file path .
The only other change is on line 31 – you can see here that I’ve added a configuration setting to our SparkSession specifying that it should use 4 CPU cores. My PC has 4 cores, so I can tell Spark to use them all – and it’s sort of like having a cluster of 4 machines at that point. Technically, 4 executor processes will be spun up on my PC when I run this, all running in parallel on different parts of the data. Without that parallelism, 20 million ratings would be too much for a single process to handle. If your PC has a different number of CPU cores, you’ll want to change this value accordingly.
When you’re running on a real cluster though, things are usually pre-configured such that the default settings Spark uses are already optimized to the hardware available on your cluster. You don’t normally need to think about this, or hard-code assumptions about your hardware into the script – this is just a special case because we’re running on a single PC and not on a real cluster. In the real world, you’ll usually be working on a cluster of some sort, and what I said earlier about using the same exact driver code no matter how big your cluster is would be true.
Let’s kick it off. It will of course take some time to train a matrix factorization model with 20 million ratings, and then generate top-10 recommendations for every single user in that dataset – but it actually finishes more quickly than some scripts we’ve run with just 100,000 ratings! We’ll pause, and come back when it’s done.
OK, you might recall that with the 100,000 ratings dataset, our RMSE was 1.12, which wasn’t great – but now it’s down to 0.81, which is actually pretty awesome. This tells us that the ALS algorithm in Spark’s ML library doesn’t work well with small datasets, or it’s tuned for larger ones with its default hyperparameters, or both. But regardless of the reason, these results are MUCH better at least in terms of accuracy .
We will also sample the recommendations of a given user, but you have to keep in mind user 85 in this dataset isn’t the same user 85 that was in our 100K dataset. Also, the results you see on your PC will be different because there is a random component to how the model is trained. But, after running it a couple of times, it would seem that this user’s tastes are pretty obscure, but somewhat consistent. Old movies from the 40’s and 50’s keep coming up for him or her, as well as more modern, foreign films or films about foreign lands. Even if we don’t have a deep subjective understanding of this user and the movies being recommended, we can still see that there seem to be some coherent patterns in what’s being recommended that aren’t just random – so that’s a good sign that this system is doing something useful.
But, the really impressive thing is that we just built a really good recommender system in just a few minutes from 20 million ratings! That is truly big data, and thanks to the efficiency of Spark, we didn’t even need more than one computer to do it!
Apache Spark is pretty cool, and it’s worth learning about in more depth. But it is a bit limited in the tools it offers for producing recommendations – let’s look at some other systems that can produce recommendations at massive scale.