17.6 Spark

In this section, we’ll overview Apache Spark. We’ll use the Python PySpark library and Spark’s functional-style filter/map/reduce capabilities to implement a simple word count example that summarizes the word counts in Romeo and Juliet.

17.6.1 Spark Overview

When you process truly big data, performance is crucial. Hadoop is geared to disk-based batch processing—reading the data from disk, processing the data and writing the results back to disk. Many big-data applications demand better performance than is possible with disk-intensive operations. In particular, fast streaming applications that require either real-time or near-real-time processing won’t work in a disk-based architecture.

History

Spark was initially developed in 2009 at U. C. Berkeley and funded by DARPA (the Defense Advanced Research Projects Agency). Initially, it was created as a distributed execution engine for high-performance machine learning.38 It uses an in-memory architecture that “has been used to sort 100 TB of data 3X faster than Hadoop MapReduce on 1/10th of the machines”39 and runs some workloads up to 100 times faster than Hadoop.40 Spark’s significantly better performance on batch-processing tasks is leading many companies to replace Hadoop MapReduce with Spark.41 , 42 , 43

Architecture and Components

Though it was initially developed to run on Hadoop and use Hadoop components like HDFS and YARN, Spark can run standalone on a single computer (typically for learning and testing purposes), standalone on a cluster or using various cluster managers and distributed storage systems. For resource management, Spark runs on Hadoop YARN, Apache Mesos, Amazon EC2 and Kubernetes, and it supports many distributed storage systems, including HDFS, Apache Cassandra, Apache HBase and Apache Hive.44

At the core of Spark are resilient distributed datasets (RDDs), which you’ll use to process distributed data using functional-style programming. In addition to reading data from disk and writing data to disk, Hadoop uses replication for fault tolerance, which adds even more disk-based overhead. RDDs eliminate this overhead by remaining in memory—using disk only if the data will not fit in memory—and by not replicating data. Spark handles fault tolerance by remembering the steps used to create each RDD, so it can rebuild a given RDD if a cluster node fails.45

Spark distributes the operations you specify in Python to the cluster’s nodes for parallel execution. Spark streaming enables you to process data as it’s received. Spark DataFrames, which are similar to pandas DataFrames, enable you to view RDDs as a collection of named columns. You can use Spark DataFrames with Spark SQL to perform queries on distributed data. Spark also includes Spark MLlib (the Spark Machine Learning Library), which enables you to perform machine-learning algorithms, like those you learned in the Chapters 15 and 16. We’ll use RDDs, Spark streaming, DataFrames and Spark SQL in the next few examples. You’ll explore Spark MLlib in the chapter exercises.

Providers

Hadoop providers typically also provide Spark support. In addition to the providers listed in Section 17.5, there are Spark-specific vendors like Databricks. They provide a “zero-management cloud platform built around Spark.”46 Their website also is an excellent resource for learning Spark. The paid Databricks platform runs on Amazon AWS or Microsoft Azure. Databricks also provides a free Databricks Community Edition, which is a great way to get started with both Spark and the Databricks environment. An exercise at the end of the chapter asks you to research Databricks Community Edition, then use it to reimplement the Spark examples in the upcoming sections.

17.6.2 Docker and the Jupyter Docker Stacks

In this section, we’ll show how to download and execute a Docker stack containing Spark and the PySpark module for accessing Spark from Python. You’ll write the Spark example’s code in a Jupyter Notebook. First, let’s overview Docker.

Docker

Docker is a tool for packaging software into containers (also called images) that bundle everything required to execute that software across platforms. Some software packages we use in this chapter require complicated setup and configuration. For many of these, there are preexisting Docker containers that you can download for free and execute locally on your desktop or notebook computers. This makes Docker a great way to help you get started with new technologies quickly and conveniently.

Docker also helps with reproducibility in research and analytics studies. You can create custom Docker containers that are configured with the versions of every piece of software and every library you used in your study. This would enable others to recreate the environment you used, then reproduce your work, and will help you reproduce your results at a later time. We’ll use Docker in this section to download and execute a Docker container that’s preconfigured to run Spark applications.

Installing Docker

You can install Docker for Windows 10 Pro or macOS at:

https://www.docker.com/products/docker-desktop

On Windows 10 Pro, you must allow the "Docker for Windows.exe" installer to make changes to your system to complete the installation process. To do so, click Yes when Windows asks if you want to allow the installer to make changes to your system.47 Windows 10 Home users must use Virtual Box as described at:

https://docs.docker.com/machine/drivers/virtualbox/

Linux users should install Docker Community Edition as described at:

https://docs.docker.com/install/overview/

For a general overview of Docker, read the Getting started guide at:

https://docs.docker.com/get-started/

Jupyter Docker Stacks

The Jupyter Notebooks team has preconfigured several Jupyter “Docker stacks” containers for common Python development scenarios. Each enables you to use Jupyter Notebooks to experiment with powerful capabilities without having to worry about complex software setup issues. In each case, you can open JupyterLab in your web browser, open a notebook in JupyterLab and start coding. JupyterLab also provides a Terminal window that you can use in your browser like your computer’s Terminal, Anaconda Command Prompt or shell. Everything we’ve shown you in IPython to this point can be executed using IPython in JupyterLab’s Terminal window.

We’ll use the jupyter/pyspark-notebook Docker stack, which is preconfigured with everything you need to create and test Apache Spark apps on your computer. When combined with installing other Python libraries we’ve used throughout the book, you can implement most of this book’s examples using this container. To learn more about the available Docker stacks, visit:

https://jupyter-docker-stacks.readthedocs.io/en/latest/index.html

Run Jupyter Docker Stack

Before performing the next step, ensure that JupyterLab is not currently running on your computer. Let’s download and run the jupyter/pyspark-notebook Docker stack. To ensure that you do not lose your work when you close the Docker container, we’ll attach a local file-system folder to the container and use it to save your notebook—Windows users should replace \ with ^. :


docker run -p 8888:8888 -p 4040:4040 -it --user root \
    -v fullPathToTheFolderYouWantToUse:/home/jovyan/work \
    jupyter/pyspark-notebook:14fdfbf9cfc1 start.sh jupyter lab

The first time you run the preceding command, Docker will download the Docker container named:

jupyter/pyspark-notebook:14fdfbf9cfc1

The notation ":14fdfbf9cfc1" indicates the specific jupyter/pyspark-notebook container to download. At the time of this writing, 14fdfbf9cfc1 was the newest version of the container. Specifying the version as we did here helps with reproducibility. Without the ":14fdfbf9cfc1" in the command, Docker will download the latest version of the container, which might contain different software versions and might not be compatible with the code you’re trying to execute. The Docker container is nearly 6GB, so the initial download time will depend on your Internet connection’s speed.

Opening JupyterLab in Your Browser

Once the container is downloaded and running, you’ll see a statement in your Command Prompt, Terminal or shell window like:


Copy/paste this URL into your browser when you connect for the first
time, to login with a token:
        http://(bb00eb337630 or 127.0.0.1):8888/?token=
            9570295e90ee94ecef75568b95545b7910a8f5502e6f5680

Copy the long hexadecimal string (the string on your system will differ from this one):

9570295e90ee94ecef75568b95545b7910a8f5502e6f5680

then open http://localhost:8888/lab in your browser (localhost corresponds to 127.0.0.1 in the preceding output) and paste your token in the Password or token field. Click Log in to be taken to the JupyterLab interface. If you accidentally close your browser, go to http://localhost:8888/lab to continue your session.

In JupyterLab running in this Docker container, the work folder in the Files tab at the left side of the JupyterLab interface represents the folder you attached to the container in the docker run command’s -v option. From this folder, you can open the notebook files we provide for you. Any new notebooks or other files you create will be saved to this folder by default. Because the Docker container’s work folder is connected to a folder on your computer, any files you create in JupyterLab will remain on your computer, even if you decide to delete the Docker container.

Accessing the Docker Container’s Command Line

Each Docker container has a command-line interface like the one you’ve used to run IPython throughout this book. Via this interface, you can install Python packages into the Docker container and even use IPython as you’ve done previously.

Open a separate Anaconda Command Prompt, Terminal or shell and list the currently running Docker containers with the command:

docker ps

The output of this command is wide, so the lines of text will likely wrap, as in:


CONTAINER ID        IMAGE                                 COMMAND
           CREATED            STATUS              PORTS
  NAMES
f54f62b7e6d5        jupyter/pyspark-notebook:14fdfbf9cfc1 "tini -g --
/bin/bash"   2 minutes ago     Up 2 minutes       0.0.0.0:8888->8888/tcp
  friendly_pascal

In the last line of our system’s output under the column head NAMES in the third line is the name that Docker randomly assigned to the running container—friendly_pascal—the name on your system will differ. To access the container’s command line, execute the following command, replacing container_name with the running container’s name:

docker exec -it container_name /bin/bash

The Docker container uses Linux under the hood, so you’ll see a Linux prompt where you can enter commands.

The app in this section will use features of the NLTK and TextBlob libraries you used in the “Natural Language Processing” chapter. Neither is preinstalled in the Jupyter Docker stacks. To install NLTK and TextBlob enter the command:

conda install -c conda-forge nltk textblob

Stopping and Restarting a Docker Container

Every time you start a container with docker run, Docker gives you a new instance that does not contain any libraries you installed previously. For this reason, you should keep track of your container name, so you can use it from another Anaconda Command Prompt, Terminal or shell window to stop the container and restart it. The command

docker stop container_name

will shut down the container. The command

docker restart container_name

will restart the container. Docker also provides a GUI app called Kitematic that you can use to manage your containers, including stopping and restarting them. You can get the app from https://kitematic.com/ and access it through the Docker menu. The following user guide overviews how to manage containers with the tool:

https://docs.docker.com/kitematic/userguide/

17.6.3 Word Count with Spark

In this section, we’ll use Spark’s filtering, mapping and reducing capabilities to implement a simple word count example that summarizes the words in Romeo and Juliet. You can work with the existing notebook named RomeoAndJulietCounter.ipynb in the SparkWordCount folder (into which you should copy your RomeoAndJuliet.txt file from the “Natural Language Processing” chapter), or you can create a new notebook, then enter and execute the snippets we show.

Loading the NLTK Stop Words

In this app, we’ll use techniques you learned in the “Natural Language Processing” chapter to eliminate stop words from the text before counting the words’ frequencies. First, download the NLTK stop words:


[1]: import nltk
     nltk.download('stopwords')
[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data] Package stopwords is already up-to-date!
[1]: True

Next, load the stop words:


[2]: from nltk.corpus import stopwords
     stop_words = stopwords.words('english')

Configuring a SparkContext

A SparkContext (from module pyspark) object gives you access to Spark’s capabilities in Python. Many Spark environments create the SparkContext for you, but in the Jupyter pyspark-notebook Docker stack, you must create this object.

First, let’s specify the configuration options by creating a SparkConf object (from module pyspark). The following snippet calls the object’s setAppName method to specify the Spark application’s name and calls the object’s setMaster method to specify the Spark cluster’s URL. The URL 'local[*]' indicates that Spark is executing on your local computer (as opposed to a cloud-based cluster), and the asterisk indicates that Spark should run our code using the same number of threads as there are cores on the computer:


[3]: from pyspark import SparkConf
     configuration = SparkConf().setAppName('RomeoAndJulietCounter')\
                                .setMaster('local[*]')

Threads enable a single node cluster to execute portions of the Spark tasks concurrently to simulate the parallelism that Spark clusters provide. When we say that two tasks are operating concurrently, we mean that they’re both making progress at once—typically by executing a task for a short burst of time, then allowing another task to execute. When we say that two tasks are operating in parallel, we mean that they’re executing simultaneously, which is one of the key benefits of Hadoop and Spark executing on cloud-based clusters of computers.

Next, create the SparkContext, passing the SparkConf as its argument:


[4]: from pyspark import SparkContext
     sc = SparkContext(conf=configuration)

Reading the Text File and Mapping It to Words

You work with a SparkContext using functional-style programming techniques, like filtering, mapping and reduction, applied to a resilient distributed dataset (RDD). An RDD takes data stored throughout a cluster in the Hadoop file system and enables you to specify a series of processing steps to transform the data in the RDD. These processing steps are lazy (Chapter 5)—they do not perform any work until you indicate that Spark should process the task.

The following snippet specifies three steps:

  • SparkContext method textFile loads the lines of text from RomeoAndJuliet.txt and returns it as an RDD (from module pyspark) of strings that represent each line.

  • RDD method map uses its lambda argument to remove all punctuation with TextBlob’s strip_punc function and to convert each line of text to lowercase. This method returns a new RDD on which you can specify additional tasks to perform.

  • RDD method flatMap uses its lambda argument to map each line of text into its words and produces a single list of words, rather than the individual lines of text. The result of flatMap is a new RDD representing all the words in Romeo and Juliet.


[5]: from textblob.utils import strip_punc
     tokenized = sc.textFile('RomeoAndJuliet.txt')\
                   .map(lambda line: strip_punc(line, all=True).lower())\
                   .flatMap(lambda line: line.split())

Removing the Stop Words

Next, let’s use RDD method filter to create a new RDD with no stop words remaining:

[6]: filtered = tokenized.filter(lambda word: word not in stop_words)

Counting Each Remaining Word

Now that we have only the non-stop-words, we can count the number of occurrences of each word. To do so, we first map each word to a tuple containing the word and a count of 1. This is similar to what we did in Hadoop MapReduce. Spark will distribute the reduction task across the cluster’s nodes. On the resulting RDD, we then call the method reduceByKey, passing the operator module’s add function as an argument. This tells method reduceByKey to add the counts for tuples that contain the same word (the key):


[7]: from operator import add
     word_counts = filtered.map(lambda word: (word, 1)).reduceByKey(add)

Locating Words with Counts Greater Than or Equal to 60

Since there are hundreds of words in Romeo and Juliet, let’s filter the RDD to keep only those words with 60 or more occurrences:

[8]: filtered_counts = word_counts.filter(lambda item: item[1] >= 60)

Sorting and Displaying the Results

At this point, we’ve specified all the steps to count the words. When you call RDD method collect, Spark initiates all the processing steps we specified above and returns a list containing the final results—in this case, the tuples of words and their counts. From your perspective, everything appears to execute on one computer. However, if the SparkContext is configured to use a cluster, Spark will divide the tasks among the cluster’s worker nodes for you. In the following snippet, sort in descending order (reverse=True) the list of tuples by their counts (itemgetter(1)).

The following snippet calls method collect to obtain the results and sorts those results in descending order by word count:


[9]: from operator import itemgetter
     sorted_items = sorted(filtered_counts.collect(),
                           key=itemgetter(1), reverse=True)

Finally, let’s display the results. First, we determine the word with the most letters so we can right-align all the words in a field of that length, then we display each word and its count:


[10]: max_len = max([len(word) for word, count in sorted_items])
      for word, count in sorted_items:
          print(f'{word:>{max_len}}: {count}')
[10]:    romeo: 298
          thou: 277
        juliet: 178
           thy: 170S
         nurse: 146
       capulet: 141
          love: 136
          thee: 135
         shall: 110
          lady: 109
         friar: 104
          come: 94
      mercutio: 83
          good: 80
      benvolio: 79
         enter: 75
            go: 75
          i’ll: 71
        tybalt: 69
         death: 69
         night: 68
      lawrence: 67
           man: 65
          hath: 64
           one: 60

17.6.4 Spark Word Count on Microsoft Azure

As we said previously, we want to expose you to both tools you can use for free and real-world development scenarios. In this section, you’ll implement the Spark word-count example on a Microsoft Azure HDInsight Spark cluster.

Create an Apache Spark Cluster in HDInsight Using the Azure Portal

The following link explains how to set up a Spark cluster using the HDInsight service:

https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-jupyter-spark-sql-use-portal

While following the Create an HDInsight Spark cluster steps, note the same issues we listed in the Hadoop cluster setup earlier in this chapter and for the Cluster type select Spark.

Again, the default cluster configuration provides more resources than you need for our examples. So, in the Cluster summary, perform the steps shown in the Hadoop cluster setup to change the number of worker nodes to 2 and to configure the worker and head nodes to use D3 v2 computers. When you click Create, it takes 20 to 30 minutes to configure and deploy your cluster.

Install Libraries into a Cluster

If your Spark code requires libraries that are not installed in the HDInsight cluster, you’ll need to install them. To see what libraries are installed by default, you can use ssh to log into your cluster (as we showed earlier in the chapter) and execute the command:

/usr/bin/anaconda/envs/py35/bin/conda list

Since your code will execute on multiple cluster nodes, libraries must be installed on every node. Azure requires you to create a Linux shell script that specifies the commands to install the libraries. When you submit that script to Azure, it validates the script, then executes it on every node. Linux shell scripts are beyond this book’s scope, and the script must be hosted on a web server from which Azure can download the file. So, we created an install script for you that installs the libraries we use in the Spark examples. Perform the following steps to install these libraries:

  1. In the Azure portal, select your cluster.

  2. In the list of items under the cluster’s search box, click Script Actions.

  3. Click Submit new to configure the options for the library installation script. For the Script type select Custom, for the Name specify libraries and for the Bash script URI use: http://deitel.com/bookresources/IntroToPython/install_libraries.sh

  4. Check both Head and Worker to ensure that the script installs the libraries on all the nodes.

  5. Click Create.

When the cluster finishes executing the script, if it executed successfully, you’ll see a green check next to the script name in the list of script actions. Otherwise, Azure will notify you that there were errors.

Copying RomeoAndJuliet.txt to the HDInsight Cluster

As you did in the Hadoop demo, let’s use the scp command to upload to the cluster the RomeoAndJuliet.txt file you used in the “Natural Language Processing” chapter. In a Command Prompt, Terminal or shell, change to the folder containing the file (we assume this chapter’s ch17 folder), then enter the following command. Replace YourClusterName with the name you specified when creating your cluster and press Enter only when you’ve typed the entire command. The colon is required and indicates that you’ll supply your cluster password when prompted. At that prompt, type the password you specified when setting up the cluster, then press Enter:

scp RomeoAndJuliet.txt sshuser@YourClusterName-ssh.azurehdinsight.net:

Next, use ssh to log into your cluster and access its command line. In a Command Prompt, Terminal or shell, execute the following command. Be sure to replace YourClusterName with your cluster name. Again, you’ll be prompted for your cluster password:

ssh sshuser@YourClusterName-ssh.azurehdinsight.net

To work with the RomeoAndJuliet.txt file in Spark, first use the ssh session to copy the file into the cluster’s Hadoop’s file system by executing the following command. Once again, we’ll use the already existing folder /examples/data that Microsoft includes for use with HDInsight tutorials. Again, press Enter only when you’ve typed the entire command:


hadoop fs -copyFromLocal RomeoAndJuliet.txt
   /example/data/RomeoAndJuliet.txt

Accessing Jupyter Notebooks in HDInsight

At the time of this writing, HDInsight uses the old Jupyter Notebook interface, rather than the newer JupyterLab interface shown earlier. For a quick overview of the old interface see:

https://jupyter-notebook.readthedocs.io/en/stable/examples/Notebook/Notebook%20Basics.html 

To access Jupyter Notebooks in HDInsight, in the Azure portal select All resources, then your cluster. In the Overview tab, select Jupyter notebook under Cluster dashboards. This opens a web browser window and asks you to log in. Use the username and password you specified when setting up the cluster. If you did not specify a username, the default is admin. Once you log in, Jupyter displays a folder containing PySpark and Scala subfolders. These contain Python and Scala Spark tutorials.

Uploading the RomeoAndJulietCounter.ipynb Notebook

You can create new notebooks by clicking New and selecting PySpark3, or you can upload existing notebooks from your computer. For this example, let’s upload the previous section’s RomeoAndJulietCounter.ipynb notebook and modify it to work with Azure. To do so, click the Upload button, navigate to the ch17 example folder’s SparkWordCount folder, select RomeoAndJulietCounter.ipynb and click Open. This displays the file in the folder with an Upload button to its right. Click that button to place the notebook in the current folder. Next, click the notebook’s name to open it in a new browser tab. Jupyter will display a Kernel not found dialog. Select PySpark3 and click OK. Do not run any cells yet.

Modifying the Notebook to Work with Azure

Perform the following steps, executing each cell as you complete the step:

  1. The HDInsight cluster will not allow NLTK to store the downloaded stop words in NLTK’s default folder because it’s part of the system’s protected folders. In the first cell, modify the call nltk.download('stopwords') as follows to store the stop words in the current folder ('.'):

    nltk.download('stopwords', download_dir='.')

    When you execute the first cell, Starting Spark application appears below the cell while HDInsight sets up a SparkContext object named sc for you. When this task is complete, the cell’s code executes and downloads the stop words.

  2. In the second cell, before loading the stop words, you must tell NLTK that they’re located in the current folder. Add the following statement after the import statement to tell NLTK to search for its data in the current folder:

    nltk.data.path.append('.')
  3. Because HDInsight sets up the SparkContext object for you, the third and fourth cells of the original notebook are not needed, so you can delete them. To do so, either click inside it and select Delete Cells from Jupyter’s Edit menu, or click in the white margin to the cell’s left and type dd.

  4. In the next cell, you must specify the location of RomeoAndJuliet.txt in the underlying Hadoop file system. Replace the string 'RomeoAndJuliet.txt' with the string

    'wasb:///example/data/RomeoAndJuliet.txt'

    The notation wasb:/// indicates that RomeoAndJuliet.txt is stored in a Windows Azure Storage Blob (WASB)—Azure’s interface to the HDFS file system.

  5. Because Azure currently uses Python 3.5.x, it does not support f-strings. So, in the last cell, replace the f-string with the following older-style Python string formatting using the string method format:

print('{:>{width}}: {}'.format(word, count, width=max_len))

You’ll see the same final results as in the previous section.

Caution: Be sure to delete your cluster and other resources when you’re done with them, so you do not incur charges. For more information, see:

https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-portal

Note that when you delete your Azure resources, your notebooks will be deleted as well. You can download the notebook you just executed by selecting File > Download as > Notebook (.ipynb) in Jupyter.

Self Check for Section 17.6

  1. (Discussion) How does Docker help with reproducibility?
    Answer: Docker enables you to create custom Docker containers that are configured with the versions of every piece of software and every library used in your study. This enables others to recreate the environment you used, then prove your work.

  2. (Fill-In) Spark uses an       architecture for performance.
    Answer: in-memory

  3. (True/False) Hadoop and Spark both implement fault tolerance by replicating data.
    Answer: False. Hadoop implements fault tolerance by replicating data across nodes. Spark implements fault tolerance by remembering the steps used to create each RDD so it can be rebuilt if a cluster node fails.

  4. (True/False) Spark’s significantly better performance on batch-processing tasks is leading many companies to replace Hadoop MapReduce with Spark.
    Answer: True.

  5. (True/False) You work with a SparkContext using functional-style filter, map and reduce operations, applied to a resilient distributed dataset (RDD).
    Answer: True.

  6. (Discussion) Assuming that sc is a SparkContext, what does the following code do? Are any results produced when the statement completes?

    
    from textblob.utils import strip_punc
    tokenized = sc.textFile('RomeoAndJuliet.txt')\
                  .map(lambda line: strip_punc(line, all=True).lower())\
                  .flatMap(lambda line: line.split())
    


    Answer: This code first creates an RDD from a text file. Next it uses RDD method map to produce a new RDD containing the lines of text with punctuation removed and in all lowercase letters. Finally, it produces another new RDD representing the individual words in all the lines. This statement specifies only processing steps, which are lazy, so no results will be produced until you call an RDD method like collect that initiates the processing steps.