After accumulating a few petabytes of data or so, someone inevitably asks how all this data is going to be backed up. It’s a deceptively difficult problem to overcome when working with such a large repository of data. Overcoming even simple problems like knowing what has changed since the last backup can prove difficult with a high rate of new data arrival in a sufficiently large cluster. All backup solutions need to deal explicitly with a few key concerns. Selecting the data that should be backed up is a two-dimensional problem, in that both the critical datasets must be choosen, as must, within each dataset, the subset of the data that has not yet been backed up. The timeliness of backups is another important question. Data can be backed up less frequently, in larger batches, but this affects the window of possible data loss. Ratcheting up the frequency of a backup may not be feasible due to the incurred overhead. Finally, one of the most difficult problems that must be tackled is that of backup consistency. Copying the data as it’s changing can potentially result in an invalid backup. For this reason, some knowledge of how the application functions with respect to the underlying filesystem is necessary. Those with experience administering relational databases are intimately aware of the problems with simply copying data out from under a running system.
The act of taking a backup implies the execution of a batch operation that (usually) marks a specific point in time, copies a subset of the total data to a second location, and records the success or failure of the process. Whether applications can continue working during the copy phase depends on how they manage data and the features offered by the filesystem. Many enterprise filesystems support advanced features such as snapshots to minimize the window of time required to get a consistent capture of the data on disk and decoupling it from the time required to copy said data elsewhere. Today, HDFS doesn’t support snapshots, although the community is working on this feature as part of the Apache HDFS JIRA HDFS-233. It is a little simpler working with HDFS, however, because of the write-once semantics of files. Short of replacing a file, it is possible only to append to an existing file or write a new file altogether.
There are two primary approaches to backup today. The first is the distributed copy tool, or distcp for short, which copies HDFS data in parallel either to another location within the same cluster, or between clusters. Another common approach is an architectural solution to the backup problem that involves writing incoming data to two clusters from an application. Each has advantages and disadvantages, and they lend themselves to different situations.
Hadoop includes the distcp
utility,
which is, effectively, a MapReduce job that performs parallel copies of
HDFS either to the local cluster or to a remote cluster. Users of the
rsync
utility will notice similarities in the feature
set of distcp
and should feel at home with how it
works. For others, much of the help text is self-explanatory (see Example 11-1). In Apache Hadoop 1.x and CDH3
releases, distcp
is a subcommand of the
hadoop
command, whereas later versions include it as
a subcommand of mapred
.
Example 11-1. DistCp help text
[esammer@hadoop01 ~]$ hadoop distcp java.lang.IllegalArgumentException: Missing dst path at org.apache.hadoop.tools.DistCp$Arguments.valueOf(DistCp.java:830) at org.apache.hadoop.tools.DistCp.run(DistCp.java:881) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) at org.apache.hadoop.tools.DistCp.main(DistCp.java:908) distcp [OPTIONS] <srcurl>* <desturl> OPTIONS: -p[rbugp] Preserve status r: replication number b: block size u: user g: group p: permission -p alone is equivalent to -prbugp -i Ignore failures -log <logdir> Write logs to <logdir> -m <num_maps> Maximum number of simultaneous copies -overwrite Overwrite destination -update Overwrite if src size different from dst size -skipcrccheck Do not use CRC check to determine if src is different from dest. Relevant only if -update is specified -f <urilist_uri> Use list at <urilist_uri> as src list -filelimit <n> Limit the total number of files to be <= n -sizelimit <n> Limit the total size to be <= n bytes -delete Delete the files existing in the dst but not in src -mapredSslConf <f> Filename of SSL configuration for mapper task NOTE 1: if -overwrite or -update are set, each source URI is interpreted as an isomorphic update to an existing directory. For example: hadoop distcp -p -update "hdfs://A:8020/user/foo/bar" "hdfs://B:8020/user/foo/baz" would update all descendants of 'baz' also in 'bar'; it would *not* update /user/foo/baz/bar NOTE 2: The parameter <n> in -filelimit and -sizelimit can be specified with symbolic representation. For examples, 1230k = 1230 * 1024 = 1259520 891g = 891 * 1024^3 = 956703965184
The URLs referred to by the distcp
help text
(seen in the previous extract as <srcurl>
and <dsturl>
) are the same as one
would use when referring to an HDFS or local filesystem path. That is,
to copy data from the HDFS cluster managed by namenode A to the HDFS
cluster managed by namenode B, we’d use the command hadoop
distcp hdfs://A:8020/path/one hdfs://B:8020/path/two
as in
Example 11-2.
Example 11-2. Performing a simple copy with distcp between clusters
[esammer@hadoop01 ~]$ hadoop distcp hdfs://A:8020/path/one hdfs://B:8020/path/two
Note that it’s perfectly valid to copy data to the same cluster, but under a different path, as shown in Example 11-3.
Example 11-3. Performing a copy with distcp within the same cluster
[esammer@hadoop01 ~]$ hadoop distcp hdfs://A:8020/path/one hdfs://A:8020/path/two
In fact, it’s possible to omit the scheme and host portions of the
URLs if the default from the client’s fs.default.name
parameter is sufficient. In Example 11-4, we
assume that fs.default.name
is set to
hdfs://A:8020/
.
Example 11-4. Performing a copy with distcp within the same cluster using the default scheme
[esammer@hadoop01 ~]$ hadoop distcp /path/one /path/two
When the utility runs, it creates a list of files and directories
to copy based on the source URL and creates a MapReduce job with a fixed
number of map tasks, each of which works on a set of files. The
-m
option controls the number of mapper to spawn for a
given job. By default, distcp
assumes that you want
to perform a basic copy for which the destination doesn’t exist. If the
destination does exist, you must tell distcp
how you
want it to behave with respect to existing data. You may optionally
overwrite any existing files with the -overwrite
option
or attempt to update or freshen the data with -update
.
Updating existing data uses the size of the file, but also a CRC32 checksum, to decide what files have changed. This
task can take some time, so it’s possible to disable the CRC32 checksum
calculation with the -skipcrccheck
option. The
-delete
option, like rsync
’s
--delete
option, attempts to delete any files in the
destination that do not exist in the source path. Combined with the
-update
option, this option makes
distcp
behave like a file synchronization utility. In
most cases, it’s desirable to preserve the owner, group, permissions,
and other file attributes when performing a copy, although doing so requires superuser
privileges on the destination cluster because you’re writing files as
users other than the effective user of your process. As for the
permissions required to read the files on the source cluster, you must,
of course, have at least read access to the files you wish to back up or
be the super user.
There are, however, a few limitations to using
distcp
for backups. Because it runs as a MapReduce
job, distcp
does chew up map slots and has all the
same associated overhead therein. Each map task in the job is
essentially an HDFS client of the remote cluster, and all the same HDFS
write path semantics apply. Notably, each client—which in this case, is
any worker node in the cluster—must be able to write to each datanode in
the remote cluster directly. This setup creates an
N
x M
communication pattern requirement between the two clusters, where
N
is the number of source cluster nodes and
M
is the number of destination cluster nodes.
Environments that do not permit full N
x
M
communication between clusters either will
not be able to use distcp
or will experience a ton of
pain trying to configure SOCKS proxies appropriately (which, ultimately,
may not be able to withstand the amount of data transfer). Depending on
the network infrastructure, this may or may not be a problem. It’s also
important to control the number of map tasks and data transferred
appropriately, lest you saturate any private data center
interconnect.
When using the hdfs://
schema to connect to clusters,
each cluster must be running the same version of the Hadoop software. If
this is not the case, webhdfs://
should be used instead; it
uses an HTTP-based, version-agnostic protocol to transfer data.
This benefit does come at some cost to performance, but presumably being
able to transfer the data slower is better than not being able to
transfer it at all. One final option is to use the httpfs://
protocol, which, like webhdfs://
, uses HTTP, but instead of
using the embedded web server in each datanode, it uses a dedicated HDFS
HttpFS proxy daemon to communicate with the cluster. Both
webhdfs://
and httpfs://
require some additional,
minor configuration.
Traditionally, we think of the data flow as an application to master storage, and then master storage to slave or backup storage as a batch job, at a fixed interval. In fact, that’s what the preceding section describes, and it makes sense for a few reasons. This model is simple for application developers to deal with and allows backup to be controlled in a central manner. Backup is outside of the developer domain, a problem for someone else to contend with, and largely transparent. For operations staff, this is the source of many consistency questions: where exactly is the opportunity to take a consistent picture of the data, given different applications? A different perspective on the backup problem is to instead handle it explicitly at the application level, where the consistency semantics are well-known. Treating backup as a first-class problem to be handled by the application forces the problem into the spotlight, where it can be dealt with explicitly and controlled.
So what does that mean? Trivially, have each application responsible for data ingestion write to two discreet clusters, completely eliminating the separate backup process. Although it’s possible to make two write operations, normally this is done by having applications write to a durable service that in turn delivers events to two different places. This approach decouples the application write from the cluster writes, acting as a queue, similar to a messaging system. For this method to be feasible, the application must be sure to deliver to the ingest service, which delivers to each cluster asynchronously so that blocking does not occur. Additionally, for the application to know that the data will safely delivered to each cluster (eventually), the ingest service must be durable. In some cases, this durability can be traded for speed, although that wouldn’t be appropriate for revenue-bearing events.
You could build this setup yourself; however, a number of open source systems in the Hadoop ecosystem provide exactly this functionality. One of these systems is Apache Flume. Flume can be configured to listen to various data sources like RPC interfaces—for instance, writing received events to one or more channels, which are ultimately connected to sinks. Channels are effectively a queue, and can be reliable, in which case they store data durably on disk before attempting delivery, or not, in which case they only store events in memory while they shuffle it along as fast as possible. Sinks take events from channels and do something with them. The most popular of the sinks is the HDFS sink, which writes events into HDFS in various configurable ways. Flume’s ability to “fan out” and duplicate events to separate channels, each of which may go to separate clusters, is one way to deal with the problem of backup, that is, avoiding it as much as possible by rethinking the problem.
Flume isn’t a perfect fit for all use cases. It is specifically built for streaming data sources that produce data as discreet events. Primarily, this means log files or things that look like that: syslog events, log4j messages, and so forth. Refer to the Flume website and documentation for more information about its intended set of use cases.
When plain old files need to be uploaded to HDFS, the parallel
write technique can still be used. Rather than have processes write
files directly to HDFS or call the hdfs dfs -put
command directly, write the file to a designated directory on a local
filesystem somewhere. Use a polling process to scan this directory for
new files and create hardlinks (not symlinks) into a set of directories
that act as a queue for each cluster. When the hardlinks are all
successfully created, the original entry in the first directory can be
deleted. A poller for each cluster can now run asynchronously, scanning
only its directory for new files to upload. When the file is
successfully uploaded, it can be deleted. The hardlink decrements the
reference count on the data on disk, and after all clusters have copied
and deleted their respective pending hardlinks, the data is removed from
the local disk. This approach makes sense for cases in which data should
be uploaded to multiple HDFS clusters reliably and removed from the
staging or incoming area.
Using these techniques, the only remaining data that needs to be
backed up in a traditional fashion is that which is generated as a
result of MapReduce jobs within HDFS. Although some of this data is
critical and should be backed up using distcp
and
other traditional cluster-to-cluster copy techniques, it’s possible that
much of it can be regenerated from the raw source data that was
replicated on ingestion. With the total cluster processing power of
MapReduce, regenerating the data in the event of catastrophic failure
may be possible in a reasonably small amount of time, and although it’s
not the most fun way to spend an afternoon or two, it may be a
calculated risk.
Parallel writes to multiple clusters have the added benefit of drastically reducing the window of time between discreet backup operations. Data is much fresher in both clusters, and this approach also opens the possibility of running read-only or ad hoc analysis jobs on the otherwise backup-only cluster. Another nice side effect is that by streaming data throughout the data to two clusters, the costly, large, batch network operation of a full or incremental backup is amortized throughout the day. This way tends to be easier to manage and control and reduces the risk of a large backup “eating its tail” (meaning that the next backup is scheduled to start, but the previous instance is still running). Recovering from the case in which one cluster falls behind happens implicitly, because this technique is based on the idea of queuing. Temporarily disabling backups for maintenance is unnecessary; data destined for a cluster in maintenance accumulates in the queue until the cluster returns to service.
Again, there’s no free lunch to be had here. The downside to this approach is that the idea of backups is something developers have to think about. Some (including myself) might argue that this is where the problem belongs, but there are plenty of reasons why this may not be feasible within an organization. Having additional infrastructure for data ingestion like Flume or file queuing means more to monitor.
Without the namenode metadata, there might as well be no
data in HDFS. Historically, for mission-critical production clusters, it
has been essential to script namenode metadata backup. In Apache Hadoop
1.0 and CDH3, this is done by using two different calls to the /getimage servlet included in the namenode’s
embedded web server. The first call uses the query parameter getimage=1
and retrieves the fsimage file; the second uses getedit=1
and returns the edits file. In both cases, the data is
retrieved at a consistent point from disk, which guarantees that it can be
safely restored later. In Example 11-5, we fetch
a copy of the fsimage and edits files via the servlet and then use the
md5sum
command to prove that they are
identical to what’s on disk in the namenode’s metadata directory. This
validation step is only for illustration purposes; on an active cluster,
it is very likely that the edits file would have changed between the time
we fetched it and the time we compared the files with
md5sum
.
Example 11-5. Backing up the namenode metadata (Apache Hadoop 1.0 and CDH3)
[root@hadoop01 ~]# curl -o fsimage.20120801 'http://hadoop01:50070/getimage?getimage=1' \ 2>/dev/null [root@hadoop01 ~]# curl -o edits.20120801 'http://hadoop01:50070/getimage?getedit=1' \ 2>/dev/null [root@hadoop01 ~]# md5sum fsimage.20120801 /data/1/dfs/nn/current/fsimage \ edits.20120801 /data/1/dfs/nn/current/edits d04d6b0f60cf9603fcc7ff45b620d341 fsimage.20120801 d04d6b0f60cf9603fcc7ff45b620d341 /data/1/dfs/nn/current/fsimage d944934c10b4f6b5ac8ba5f0490a759b edits.20120801 d944934c10b4f6b5ac8ba5f0490a759b /data/1/dfs/nn/current/edits
Apache Hadoop 2.0 and CDH4 use a new metadata storage format
designed to be more resilient to corruption, as described in Managing Filesystem Metadata. As a result, the
/getimage
servlet in these versions works a little
differently, although it serves the same purpose. To retrieve the latest
fsimage, you must additionally pass
the txid=latest
query parameter. Retrieving edit
transactions is a bit more complicated because the new storage system uses
multiple edits files. It is now necessary to pass a start and end
transaction ID. Despite being more cumbersome, this approach allows for
incremental transaction retrieval, which can make the backup process more
efficient. Example 11-6 demonstrates retrieval of
the latest fsimage followed by the
transactions 17 through 109. If an attempt is made to fetch a transaction
range that the namenode does not have, an HTTP 410 error is returned,
along with an HTML document containing a stack trace.
Example 11-6. Backing up the namenode metadata (Apache Hadoop 2.0 and CDH4)
[root@hadoop01 ~]# curl -o fsimage.20120801 \ 'http://hadoop01:50070/getimage?getimage=1&txid=latest' 2>/dev/null [root@hadoop01 ~]# curl -o edits-17-109.20120801 \ 'http://hadoop01:50070/getimage?getedit=1&startTxId=17&endTxId=109' \ 2>/dev/null