Chapter 11. Backup and Recovery

After accumulating a few petabytes of data or so, someone inevitably asks how all this data is going to be backed up. It’s a deceptively difficult problem to overcome when working with such a large repository of data. Overcoming even simple problems like knowing what has changed since the last backup can prove difficult with a high rate of new data arrival in a sufficiently large cluster. All backup solutions need to deal explicitly with a few key concerns. Selecting the data that should be backed up is a two-dimensional problem, in that both the critical datasets must be choosen, as must, within each dataset, the subset of the data that has not yet been backed up. The timeliness of backups is another important question. Data can be backed up less frequently, in larger batches, but this affects the window of possible data loss. Ratcheting up the frequency of a backup may not be feasible due to the incurred overhead. Finally, one of the most difficult problems that must be tackled is that of backup consistency. Copying the data as it’s changing can potentially result in an invalid backup. For this reason, some knowledge of how the application functions with respect to the underlying filesystem is necessary. Those with experience administering relational databases are intimately aware of the problems with simply copying data out from under a running system.

The act of taking a backup implies the execution of a batch operation that (usually) marks a specific point in time, copies a subset of the total data to a second location, and records the success or failure of the process. Whether applications can continue working during the copy phase depends on how they manage data and the features offered by the filesystem. Many enterprise filesystems support advanced features such as snapshots to minimize the window of time required to get a consistent capture of the data on disk and decoupling it from the time required to copy said data elsewhere. Today, HDFS doesn’t support snapshots, although the community is working on this feature as part of the Apache HDFS JIRA HDFS-233. It is a little simpler working with HDFS, however, because of the write-once semantics of files. Short of replacing a file, it is possible only to append to an existing file or write a new file altogether.

There are two primary approaches to backup today. The first is the distributed copy tool, or distcp for short, which copies HDFS data in parallel either to another location within the same cluster, or between clusters. Another common approach is an architectural solution to the backup problem that involves writing incoming data to two clusters from an application. Each has advantages and disadvantages, and they lend themselves to different situations.

Hadoop includes the distcp utility, which is, effectively, a MapReduce job that performs parallel copies of HDFS either to the local cluster or to a remote cluster. Users of the rsync utility will notice similarities in the feature set of distcp and should feel at home with how it works. For others, much of the help text is self-explanatory (see Example 11-1). In Apache Hadoop 1.x and CDH3 releases, distcp is a subcommand of the hadoop command, whereas later versions include it as a subcommand of mapred.

Example 11-1. DistCp help text

[esammer@hadoop01 ~]$ hadoop distcp
java.lang.IllegalArgumentException: Missing dst path
  at org.apache.hadoop.tools.DistCp$Arguments.valueOf(DistCp.java:830)
  at org.apache.hadoop.tools.DistCp.run(DistCp.java:881)
  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
  at org.apache.hadoop.tools.DistCp.main(DistCp.java:908)

distcp [OPTIONS] <srcurl>* <desturl>

OPTIONS:
-p[rbugp]              Preserve status
                       r: replication number
                       b: block size
                       u: user
                       g: group
                       p: permission
                       -p alone is equivalent to -prbugp
-i                     Ignore failures
-log <logdir>          Write logs to <logdir>
-m <num_maps>          Maximum number of simultaneous copies
-overwrite             Overwrite destination
-update                Overwrite if src size different from dst size
-skipcrccheck          Do not use CRC check to determine if src is 
                       different from dest. Relevant only if -update
                       is specified
-f <urilist_uri>       Use list at <urilist_uri> as src list
-filelimit <n>         Limit the total number of files to be <= n
-sizelimit <n>         Limit the total size to be <= n bytes
-delete                Delete the files existing in the dst but not in src
-mapredSslConf <f>     Filename of SSL configuration for mapper task

NOTE 1: if -overwrite or -update are set, each source URI is 
      interpreted as an isomorphic update to an existing directory.
For example:
hadoop distcp -p -update "hdfs://A:8020/user/foo/bar" "hdfs://B:8020/user/foo/baz"

     would update all descendants of 'baz' also in 'bar'; it would 
     *not* update /user/foo/baz/bar

NOTE 2: The parameter <n> in -filelimit and -sizelimit can be 
     specified with symbolic representation.  For examples,
       1230k = 1230 * 1024 = 1259520
       891g = 891 * 1024^3 = 956703965184

The URLs referred to by the distcp help text (seen in the previous extract as <srcurl> and <dsturl>) are the same as one would use when referring to an HDFS or local filesystem path. That is, to copy data from the HDFS cluster managed by namenode A to the HDFS cluster managed by namenode B, we’d use the command hadoop distcp hdfs://A:8020/path/one hdfs://B:8020/path/two as in Example 11-2.

Note that it’s perfectly valid to copy data to the same cluster, but under a different path, as shown in Example 11-3.

In fact, it’s possible to omit the scheme and host portions of the URLs if the default from the client’s fs.default.name parameter is sufficient. In Example 11-4, we assume that fs.default.name is set to hdfs://A:8020/.

When the utility runs, it creates a list of files and directories to copy based on the source URL and creates a MapReduce job with a fixed number of map tasks, each of which works on a set of files. The -m option controls the number of mapper to spawn for a given job. By default, distcp assumes that you want to perform a basic copy for which the destination doesn’t exist. If the destination does exist, you must tell distcp how you want it to behave with respect to existing data. You may optionally overwrite any existing files with the -overwrite option or attempt to update or freshen the data with -update. Updating existing data uses the size of the file, but also a CRC32 checksum, to decide what files have changed. This task can take some time, so it’s possible to disable the CRC32 checksum calculation with the -skipcrccheck option. The -delete option, like rsync’s --delete option, attempts to delete any files in the destination that do not exist in the source path. Combined with the -update option, this option makes distcp behave like a file synchronization utility. In most cases, it’s desirable to preserve the owner, group, permissions, and other file attributes when performing a copy, although doing so requires superuser privileges on the destination cluster because you’re writing files as users other than the effective user of your process. As for the permissions required to read the files on the source cluster, you must, of course, have at least read access to the files you wish to back up or be the super user.

There are, however, a few limitations to using distcp for backups. Because it runs as a MapReduce job, distcp does chew up map slots and has all the same associated overhead therein. Each map task in the job is essentially an HDFS client of the remote cluster, and all the same HDFS write path semantics apply. Notably, each client—which in this case, is any worker node in the cluster—must be able to write to each datanode in the remote cluster directly. This setup creates an N x M communication pattern requirement between the two clusters, where N is the number of source cluster nodes and M is the number of destination cluster nodes. Environments that do not permit full N x M communication between clusters either will not be able to use distcp or will experience a ton of pain trying to configure SOCKS proxies appropriately (which, ultimately, may not be able to withstand the amount of data transfer). Depending on the network infrastructure, this may or may not be a problem. It’s also important to control the number of map tasks and data transferred appropriately, lest you saturate any private data center interconnect.

When using the hdfs:// schema to connect to clusters, each cluster must be running the same version of the Hadoop software. If this is not the case, webhdfs:// should be used instead; it uses an HTTP-based, version-agnostic protocol to transfer data. This benefit does come at some cost to performance, but presumably being able to transfer the data slower is better than not being able to transfer it at all. One final option is to use the httpfs:// protocol, which, like webhdfs://, uses HTTP, but instead of using the embedded web server in each datanode, it uses a dedicated HDFS HttpFS proxy daemon to communicate with the cluster. Both webhdfs:// and httpfs:// require some additional, minor configuration.

Traditionally, we think of the data flow as an application to master storage, and then master storage to slave or backup storage as a batch job, at a fixed interval. In fact, that’s what the preceding section describes, and it makes sense for a few reasons. This model is simple for application developers to deal with and allows backup to be controlled in a central manner. Backup is outside of the developer domain, a problem for someone else to contend with, and largely transparent. For operations staff, this is the source of many consistency questions: where exactly is the opportunity to take a consistent picture of the data, given different applications? A different perspective on the backup problem is to instead handle it explicitly at the application level, where the consistency semantics are well-known. Treating backup as a first-class problem to be handled by the application forces the problem into the spotlight, where it can be dealt with explicitly and controlled.

So what does that mean? Trivially, have each application responsible for data ingestion write to two discreet clusters, completely eliminating the separate backup process. Although it’s possible to make two write operations, normally this is done by having applications write to a durable service that in turn delivers events to two different places. This approach decouples the application write from the cluster writes, acting as a queue, similar to a messaging system. For this method to be feasible, the application must be sure to deliver to the ingest service, which delivers to each cluster asynchronously so that blocking does not occur. Additionally, for the application to know that the data will safely delivered to each cluster (eventually), the ingest service must be durable. In some cases, this durability can be traded for speed, although that wouldn’t be appropriate for revenue-bearing events.

You could build this setup yourself; however, a number of open source systems in the Hadoop ecosystem provide exactly this functionality. One of these systems is Apache Flume. Flume can be configured to listen to various data sources like RPC interfaces—for instance, writing received events to one or more channels, which are ultimately connected to sinks. Channels are effectively a queue, and can be reliable, in which case they store data durably on disk before attempting delivery, or not, in which case they only store events in memory while they shuffle it along as fast as possible. Sinks take events from channels and do something with them. The most popular of the sinks is the HDFS sink, which writes events into HDFS in various configurable ways. Flume’s ability to “fan out” and duplicate events to separate channels, each of which may go to separate clusters, is one way to deal with the problem of backup, that is, avoiding it as much as possible by rethinking the problem.

Flume isn’t a perfect fit for all use cases. It is specifically built for streaming data sources that produce data as discreet events. Primarily, this means log files or things that look like that: syslog events, log4j messages, and so forth. Refer to the Flume website and documentation for more information about its intended set of use cases.

When plain old files need to be uploaded to HDFS, the parallel write technique can still be used. Rather than have processes write files directly to HDFS or call the hdfs dfs -put command directly, write the file to a designated directory on a local filesystem somewhere. Use a polling process to scan this directory for new files and create hardlinks (not symlinks) into a set of directories that act as a queue for each cluster. When the hardlinks are all successfully created, the original entry in the first directory can be deleted. A poller for each cluster can now run asynchronously, scanning only its directory for new files to upload. When the file is successfully uploaded, it can be deleted. The hardlink decrements the reference count on the data on disk, and after all clusters have copied and deleted their respective pending hardlinks, the data is removed from the local disk. This approach makes sense for cases in which data should be uploaded to multiple HDFS clusters reliably and removed from the staging or incoming area.

Using these techniques, the only remaining data that needs to be backed up in a traditional fashion is that which is generated as a result of MapReduce jobs within HDFS. Although some of this data is critical and should be backed up using distcp and other traditional cluster-to-cluster copy techniques, it’s possible that much of it can be regenerated from the raw source data that was replicated on ingestion. With the total cluster processing power of MapReduce, regenerating the data in the event of catastrophic failure may be possible in a reasonably small amount of time, and although it’s not the most fun way to spend an afternoon or two, it may be a calculated risk.

Parallel writes to multiple clusters have the added benefit of drastically reducing the window of time between discreet backup operations. Data is much fresher in both clusters, and this approach also opens the possibility of running read-only or ad hoc analysis jobs on the otherwise backup-only cluster. Another nice side effect is that by streaming data throughout the data to two clusters, the costly, large, batch network operation of a full or incremental backup is amortized throughout the day. This way tends to be easier to manage and control and reduces the risk of a large backup “eating its tail” (meaning that the next backup is scheduled to start, but the previous instance is still running). Recovering from the case in which one cluster falls behind happens implicitly, because this technique is based on the idea of queuing. Temporarily disabling backups for maintenance is unnecessary; data destined for a cluster in maintenance accumulates in the queue until the cluster returns to service.

Again, there’s no free lunch to be had here. The downside to this approach is that the idea of backups is something developers have to think about. Some (including myself) might argue that this is where the problem belongs, but there are plenty of reasons why this may not be feasible within an organization. Having additional infrastructure for data ingestion like Flume or file queuing means more to monitor.

Without the namenode metadata, there might as well be no data in HDFS. Historically, for mission-critical production clusters, it has been essential to script namenode metadata backup. In Apache Hadoop 1.0 and CDH3, this is done by using two different calls to the /getimage servlet included in the namenode’s embedded web server. The first call uses the query parameter getimage=1 and retrieves the fsimage file; the second uses getedit=1 and returns the edits file. In both cases, the data is retrieved at a consistent point from disk, which guarantees that it can be safely restored later. In Example 11-5, we fetch a copy of the fsimage and edits files via the servlet and then use the md5sum command to prove that they are identical to what’s on disk in the namenode’s metadata directory. This validation step is only for illustration purposes; on an active cluster, it is very likely that the edits file would have changed between the time we fetched it and the time we compared the files with md5sum.

Apache Hadoop 2.0 and CDH4 use a new metadata storage format designed to be more resilient to corruption, as described in Managing Filesystem Metadata. As a result, the /getimage servlet in these versions works a little differently, although it serves the same purpose. To retrieve the latest fsimage, you must additionally pass the txid=latest query parameter. Retrieving edit transactions is a bit more complicated because the new storage system uses multiple edits files. It is now necessary to pass a start and end transaction ID. Despite being more cumbersome, this approach allows for incremental transaction retrieval, which can make the backup process more efficient. Example 11-6 demonstrates retrieval of the latest fsimage followed by the transactions 17 through 109. If an attempt is made to fetch a transaction range that the namenode does not have, an HTTP 410 error is returned, along with an HTML document containing a stack trace.