Chapter 6. Hadoop MapReduce

You write MapReduce jobs in Java when you need low-level control and want to optimize or streamline your big data pipeline. Using MapReduce is not required, but it is rewarding, because it is a beautifully designed system and API. Learning the basics can get you very far, very quickly, but before you embark on writing a customized MapReduce job, don’t overlook the fact that tools such as Apache Drill enable you to write standard SQL queries on Hadoop.

This chapter assumes you have a running Hadoop Distributed File System (HDFS) on your local machine or have access to a Hadoop cluster. To simulate how a real MapReduce job would run, we can run Hadoop in pseudodistributed mode on one node, either your localhost or a remote machine. Considering how much CPU, RAM, and storage resources we can fit on one box (laptop) these days, you can, in essence, create a mini supercomputer capable of running fairly massive distributed jobs. You can get pretty far on your localhost (on a subset of data) and then scale up to a full cluster when your application is ready.

If the Hadoop client is properly installed, you can get a complete listing of all available Hadoop operations by simply typing the following:

bash$ hadoop

Hadoop Distributed File System

Apache Hadoop comes with a command-line tool useful for accessing the Hadoop filesystem and launching MapReduce jobs. The filesystem access command fs is invoked as follows:

bash$ hadoop fs <command> <args>

The command is any number of standard Unix filesystem commands such as ls, cd, or mkdir preceded by a hyphen. For example, to list all the items in the HDFS root directory, type this:

bash$ hadoop fs -ls /

Note the inclusion of the / for root. If it were not included at all, the command would return nothing and might fool you into thinking that your HDFS is empty! Typing hadoop fs will print out all the filesystem operations available. Some of the more useful operations involve copying data to and from HDFS, deleting directories, and merging data in a directory.

To copy local files into a Hadoop filesystem:

bash$ hadoop fs -copyFromLocal <localSrc> <dest>

To copy a file from HDFS to your local drive:

bash$ hadoop fs -copyToLocal <hdfsSrc> <localDest>

After a MapReduce job, there will most likely be many files contained in the output directory of the job. Instead of retrieving these one by one, Hadoop has a convenient operation for merging the files into one and then storing the results locally:

bash$ hadoop fs -getmerge <hdfs_output_dir> <my_local_dir>

One essential operation for running MapReduce jobs is to first remove the output directory if it already exists, because MapReduce will fail, almost immediately, if it detects the output directory:

bash$ hadoop fs -rm rf <hdfs_dir>

MapReduce Architecture

MapReduce invokes the embarrassingly parallel paradigm of distributed computing. Initially, the data is broken into chunks, and portions are sent to identical mapper classes that extract key-value pairs from the data, line by line. The key-value pairs are then partitioned into key-list pairs where the lists are sorted. Typically, the number of partitions is the number of reduce jobs, but this is not required. In fact, multiple key-list groups can be in the same partition and reducer, but each key-list group is guaranteed not to be split across partitions or reducers. The general flow of data through a MapReduce framework is displayed in Figure 6-1.

Figure 6-1. MapReduce schema

Say, for example, we have data like this:

San Francisco, 2012
New York, 2012
San Francisco, 2017
New York, 2015
New York, 2016

The mapper could output key-value pairs such as (San Francisco, 2012) for each line in the dataset. Then the partitioner would collect the data by key and sort the list of values:

(San Francisco, [2012, 2017])
(New York, [2012, 2015, 2016])

We could designate the reducer’s function to output the maximum year such that the final output (written to the output directory) would look like this:

San Francisco, 2017
New York, 2016

It is important to consider that the Hadoop MapReduce API allows compound keys and customizable comparators for partitioning keys and sorting values.

Writing MapReduce Applications

Although there are more than a few ways to store and shuttle around data in the Hadoop ecosystem, we will focus on plain old text files. Whether the underlying data is stored as a string, CSV, TSV, or JSON data string, we easily read, share, and manipulate the data. Hadoop also provides resources for reading and writing its own Sequence and Map file formats, and you may want to explore various third-party serialization formats such as Apache Avro, Apache Thrift, Google Protobuf, Apache Parquet, and others. All of these provide operational and efficiency advantages. However, they do add a layer of complexity that you must consider.

Hadoop Data Types

Data must be shuttled around through the MapReduce universe in a format that is both reliable and efficient. Unfortunately (according to the authors of Hadoop), the native Java primitive types (e.g., boolean, int, double) and the more complex types (e.g., String, Map) do not travel well! For this reason, the Hadoop ecosystem has its own version of serializable types that are required in all MapReduce applications. Note that all the regular Java types are perfectly fine inside our MapReduce code. It is only for the connections between MapReduce components (between mapper and reducers) where we need to convert native Java types to Hadoop types.

Custom Writable and WritableComparable types

At times we need a specialized type not covered by Hadoop. In general, a Hadoop type must implement Writable, which handles the object’s serialization with a write() method and deserialization with a read() method. However, if the object will be used as a key, it must implement WritableComparable, because the compare​To() and hashCode() methods will be required during partitioning and sorting.

Mappers

The Mapper class is what maps the raw input data into a new and typically smaller sized data structure. In general, you do not need every piece of data from each line of the input files, but rather a select few items. In some cases, the line may be discarded entirely. This is your chance to decide what data will go into the next round of processing. Think of this step as a way of transforming and filtering the raw data into only the parts we actually need. If you do not include a Mapper instance in a MapReduce job, the IdentityMapper will be assumed, which just passes all the data directly through to the reducer. And if there is no reducer, input will be essentially copied to output.

Customizing a mapper

Parsing text files within a Mapper class is much the same as parsing lines from a regular text file, as in Chapter 1. The only required method is the map() method. The fundamental purpose of this map() method is to parse one line of input and output a key-value pair via the context.write() method:

public class ProductMapper extends 
    Mapper<LongWritable, Text, IntWritable, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        try {

            /* each line of file is <userID>, <productID>, <timestamp> */

            String[] items = value.toString().split(",");
            int userID = Integer.parseInt(items[0]);
            String productID = items[1];
            context.write(new IntWritable(userID), new Text(productID));

        } catch (NumberFormatException | IOException | InterruptedException e) {

            context.getCounter("mapperErrors", e.getMessage()).increment(1L);
        }
    }
    
}

There are also startup() and cleanup() methods. The startup() method is run once when the Mapper class is instantiated. You probably won’t need it, but it comes in handy, for example, when you need a data structure that needs to be used by each call to the map() method. Likewise, you probably won’t need the cleanup() method, but it is called once after the last call to map() and is used to do any cleanup actions. There is also a run() method, which does the actual business of mapping the data. There is no real reason to override this method, and it’s best to leave it alone unless you have a good reason to implement your own run() method. In “MapReduce Examples”, we show how to utilize the setup() method for some unique computations.

To use a custom mapper, you must designate it in the MapReduce application and set the map output key and value types:

job.setMapperClass(ProductMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);

Reducers

The role of the Reducer is to iterate over the list of values associated with a key and calculate a singular output value. Of course, we can customize the output type of the Reducer to return anything we would like as long as it implements Writable. It is important to note that each reducer will process at least one key and all its values, so you do not need to worry that some values belonging to a key have been sent somewhere else. The number of reducers is also the number of output files.

The Simplicity of a JSON String as Text

JSON data (where each row of a file is a separate JSON string) is everywhere, and for good reason. Many tools are capable of ingesting JSON data, and its human readability and built-in schema are really helpful. In the MapReduce world, using JSON data as input data eliminates the need for custom writables because the JSON string can be serialized in the Hadoop Text type. This process can be as simple as using JSONObject right in the map() method. Or you can create a class to consume the value.toString() for more complicated mapping schemas.

public class JSONMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {

        JSONParser parser = new JSONParser();
        try {
            JSONObject obj = (JSONObject) parser.parse(value.toString());
            
            // get what you need from this object
            String userID = obj.get("user_id").toString();
            String productID = obj.get("product_id").toString();
            int numUnits = Integer.parseInt(obj.get("num_units").toString());
            
            JSONObject output = new JSONObject();
            output.put("productID", productID);
            output.put("numUnits", numUnits);

            /* many more key value pairs, including arrays, can be added here */
            
            context.write(new Text(userID), new Text(output.toString()));
            
            
        } catch (ParseException ex) {
            //error parsing json
        }
        
    }
}

This also works great for outputting the data from the final reducer as a Text object. The final data file will be in JSON data format to enable efficient use down the rest of your pipeline. Now the reducer can input a Text, Text key-value pair and process the JSON with JSONObject. The advantage is that we did not have to create a complicated custom WritableComparable for this data structure.

Deployment Wizardry

There are many options and command-line switches for running a MapReduce job. Remember that before you run a job, the output directory needs to be deleted first:

bash$ hadoop fs -rm -r <path>/output

MapReduce Examples

To really master MapReduce, you need to practice. There is no better way to understand how it all works than to jump in and start solving problems. Although the system may seem complex and cumbersome at first, its beauty will reveal itself as you have some successes. Here are some typical examples and some insightful computations.

Custom Word Count

We may notice the built-in TokenCounterMapper class is not producing the results we like. We can always use our SimpleTokenizer class from Chapter 4:

public class SimpleTokenMapper extends 
    Mapper<LongWritable, Text, Text, LongWritable> {

    SimpleTokenizer tokenizer;
    
    @Override
    protected void setup(Context context) throws IOException {
        // only keep words greater than three chars
        tokenizer = new SimpleTokenizer(3);
    }
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
        
        String[] tokens = tokenizer.getTokens(value.toString());
        for (String token : tokens) {
            context.write(new Text(token), new LongWritable(1L));
        }

    }
}

Just be sure to set the appropriate changes in the job:

/* mapper settings */
job.setMapperClass(SimpleTokenMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

/* reducer settings */
job.setReducerClass(LongSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

Sparse Linear Algebra

Imagine that we have a large matrix (either sparse or dense) in which the i,j coordinates and corresponding value are stored in each line of the file in the format <i,j,value>. This matrix is so large that it is not practical to load it into RAM for further linear algebra routines. Our goal is to perform the matrix vector multiplication with an input vector we provide. The vector has been serialized so that the file can be included in the MapReduce job.

Imagine we have stored text files of comma- (or tab-) separated values across many nodes in our distributed filesystem. If the data is stored as a literal i,j,value text string (e.g., 34, 290, 1.2362) in each line of the file, then we can write a simple mapper to parse each line. In this case, we will do matrix multiplication, and as you may recall, that process requires multiplying each row of the matrix by the column vector of the same length. Each position i of the output vector will then take the same index as the corresponding matrix row. So we will use the matrix row i as the key. We will create a custom writable SparseMatrixWritable that contains the row index, column index, and value for each entry in the matrix:

public class SparseMatrixWritable implements Writable {
    int rowIndex; // i
    int columnIndex; // j
    double entry; // the value at i,j

    public SparseMatrixWritable() {
    }

    public SparseMatrixWritable(int rowIndex, int columnIndex, double entry) {
        this.rowIndex = rowIndex;
        this.columnIndex = columnIndex;
        this.entry = entry;
    }
    
    @Override
    public void write(DataOutput d) throws IOException {
        d.writeInt(rowIndex);
        d.writeInt(rowIndex);
        d.writeDouble(entry);
    }

    @Override
    public void readFields(DataInput di) throws IOException {
        rowIndex = di.readInt();
        columnIndex = di.readInt();
        entry = di.readDouble();
    }
    
}

A custom mapper will read in each line of text and parse the three values, using the row index as the key and the SparseMatrixWritable as the value:

public class SparseMatrixMultiplicationMapper
 extends Mapper<LongWritable, Text, IntWritable, SparseMatrixWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        try {
            String[] items = value.toString().split(",");
            int rowIndex = Integer.parseInt(items[0]);
            int columnIndex = Integer.parseInt(items[1]);
            double entry = Double.parseDouble(items[2]);
            SparseMatrixWritable smw = new SparseMatrixWritable(
            rowIndex, columnIndex, entry);
            context.write(new IntWritable(rowIndex), smw);
            // NOTE can add another context.write() for 
            // e.g., a symmetric matrix entry if matrix is sparse upper triag
        } catch (NumberFormatException | IOException | InterruptedException e) {
            context.getCounter("mapperErrors", e.getMessage()).increment(1L);
        }
    }
}

The reducer must load in the input vector in the setup() method, and then in the reduce() method we extract column indices from the list of SparseMatrixWritable, adding them to a sparse vector. The dot product of the input vector and sparse vector give the value for the output for that key (e.g., the value of the resultant vector at that index).

public class SparseMatrixMultiplicationReducer extends Reducer<IntWritable,
                       SparseMatrixWritable, IntWritable, DoubleWritable>{
    
    private RealVector vector;
    
    @Override
    protected void setup(Context context) 
        throws IOException, InterruptedException {

        /* unserialize the RealVector object */

        // NOTE this is just the filename 
        // please include the resource itself in the dist cache 
        // via -files at runtime
        // set the filename in Job conf with 
        // set("vectorFileName", "actual file name here")

        String vectorFileName = context.getConfiguration().get("vectorFileName");
        try (ObjectInputStream in = new ObjectInputStream(
        new FileInputStream(vectorFileName))) {
            vector = (RealVector) in.readObject();
        } catch(ClassNotFoundException e) {
            // err
        }
    }
    
    @Override
    protected void reduce(IntWritable key, Iterable<SparseMatrixWritable> values, 
    Context context)
        throws IOException, InterruptedException { 
        
        /* rely on the fact that rowVector dim == input vector dim */
        RealVector rowVector = new OpenMapRealVector(vector.getDimension());
        
        for (SparseMatrixWritable value : values) {
            rowVector.setEntry(value.columnIndex, value.entry);
        }
        
        double dotProduct = rowVector.dotProduct(vector);
        
        /* only write the nonzero outputs, 
        since the Matrix-Vector product is probably sparse */
        if(dotProduct != 0.0) {
            /* this outputs the vector index and its value */
            context.write(key, new DoubleWritable(dotProduct));
        }
    }
}

The job can be set up to run like this:

public class SparseAlgebraMapReduceExample extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SparseAlgebraMapReduceExample(), args);
        System.exit(exitCode);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(SparseAlgebraMapReduceExample.class);
        job.setJobName("SparseAlgebraMapReduceExample");
        
        // third command-line arg is the filepath to the serialized vector file
        job.getConfiguration().set("vectorFileName", args[2]);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(SparseMatrixMultiplicationMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(SparseMatrixWritable.class);
        job.setReducerClass(SparseMatrixMultiplicationReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        job.setNumReduceTasks(1);
   
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

This can be run with the following command:

hadoop jar MyApp.jar \\
com.datascience.SparseAlgebraMapReduceExample \\
-files /<path>/RandomVector.ser input output RandomVector.ser

You can view the output with this:

hadoop fs -cat output/part-r-00000