Chapter 2. HDFS

The first half of Apache Hadoop is a filesystem called the Hadoop Distributed Filesystem or simply HDFS. HDFS was built to support high throughput, streaming reads and writes of extremely large files. Traditional large storage area networks (SANs) and network attached storage (NAS) offer centralized, low-latency access to either a block device or a filesystem on the order of terabytes in size. These systems are fantastic as the backing store for relational databases, content delivery systems, and similar types of data storage needs because they can support full-featured POSIX semantics, scale to meet the size requirements of these systems, and offer low-latency access to data. Imagine for a second, though, hundreds or thousands of machines all waking up at the same time and pulling hundreds of terabytes of data from a centralized storage system at once. This is where traditional storage doesn’t necessarily scale.

By creating a system composed of independent machines, each with its own I/O subsystem, disks, RAM, network interfaces, and CPUs, and relaxing (and sometimes removing) some of the POSIX requirements, it is possible to build a system optimized, in both performance and cost, for the specific type of workload we’re interested in. There are a number of specific goals for HDFS:

While it is true that HDFS can be used independently of MapReduce to store large datasets, it truly shines when they’re used together. MapReduce, for instance, takes advantage of how the data in HDFS is split on ingestion into blocks and pushes computation to the machine where blocks can be read locally.

HDFS, in many ways, follows traditional filesystem design. Files are stored as opaque blocks and metadata exists that keeps track of the filename to block mapping, directory tree structure, permissions, and so forth. This is similar to common Linux filesystems such as ext3. So what makes HDFS different?

Traditional filesystems are implemented as kernel modules (in Linux, at least) and together with userland tools, can be mounted and made available to end users. HDFS is what’s called a userspace filesystem. This is a fancy way of saying that the filesystem code runs outside the kernel as OS processes and by extension, is not registered with or exposed via the Linux VFS layer. While this is much simpler, more flexible, and arguably safer to implement, it means that you don't mount HDFS as you would ext3, for instance, and that it requires applications to be explicitly built for it.

In addition to being a userspace filesystem, HDFS is a distributed filesystem. Distributed filesystems are used to overcome the limits of what an individual disk or machine is capable of supporting. Each machine in a cluster stores a subset of the data that makes up the complete filesystem with the idea being that, as we need to store more block data, we simply add more machines, each with multiple disks. Filesystem metadata is stored on a centralized server, acting as a directory of block data and providing a global picture of the filesystem’s state.

Another major difference between HDFS and other filesystems is its block size. It is common that general purpose filesystems use a 4 KB or 8 KB block size for data. Hadoop, on the other hand, uses the significantly larger block size of 64 MB by default. In fact, cluster administrators usually raise this to 128 MB, 256 MB, or even as high as 1 GB. Increasing the block size means data will be written in larger contiguous chunks on disk, which in turn means data can be written and read in larger sequential operations. This minimizes drive seek operations—one of the slowest operations a mechanical disk can perform—and results in better performance when doing large streaming I/O operations.

Rather than rely on specialized storage subsystem data protection, HDFS replicates each block to multiple machines in the cluster. By default, each block in a file is replicated three times. Because files in HDFS are write once, once a replica is written, it is not possible for it to change. This obviates the need for complex reasoning about the consistency between replicas and as a result, applications can read any of the available replicas when accessing a file. Having multiple replicas means multiple machine failures are easily tolerated, but there are also more opportunities to read data from a machine closest to an application on the network. HDFS actively tracks and manages the number of available replicas of a block as well. Should the number of copies of a block drop below the configured replication factor, the filesystem automatically makes a new copy from one of the remaining replicas. Throughout this book, we’ll frequently use the term replica to mean a copy of an HDFS block.

Applications, of course, don’t want to worry about blocks, metadata, disks, sectors, and other low-level details. Instead, developers want to perform I/O operations using higher level abstractions such as files and streams. HDFS presents the filesystem to developers as a high-level, POSIX-like API with familiar operations and concepts.

There are three daemons that make up a standard HDFS cluster, each of which serves a distinct role, shown in Table 2-1.

Blocks are nothing more than chunks of a file, binary blobs of data. In HDFS, the daemon responsible for storing and retrieving block data is called the datanode (DN). The datanode has direct local access to one or more disks—commonly called data disks—in a server on which it’s permitted to store block data. In production systems, these disks are usually reserved exclusively for Hadoop. Storage can be added to a cluster by adding more datanodes with additional disk capacity, or even adding disks to existing datanodes.

One of the most striking aspects of HDFS is that it is designed in such a way that it doesn’t require RAID storage for its block data. This keeps with the commodity hardware design goal and reduces cost as clusters grow in size. Rather than rely on a RAID controller for data safety, block data is simply written to multiple machines. This fulfills the safety concern at the cost of raw storage consumed; however, there’s a performance aspect to this as well. Having multiple copies of each block on separate machines means that not only are we protected against data loss if a machine disappears, but during processing, any copy of this data can be used. By having more than one option, the scheduler that decides where to perform processing has a better chance of being able to find a machine with available compute resources and a copy of the data. This is covered in greater detail in Chapter 3.

The lack of RAID can be controversial. In fact, many believe RAID simply makes disks faster, akin to a magic go-fast turbo button. This, however, is not always the case. A very large number of independently spinning disks performing huge sequential I/O operations with independent I/O queues can actually outperform RAID in the specific use case of Hadoop workloads. Typically, datanodes have a large number of independent disks, each of which stores full blocks. For an expanded discussion of this and related topics, see Blades, SANs, and Virtualization.

While datanodes are responsible for storing block data, the namenode (NN) is the daemon that stores the filesystem metadata and maintains a complete picture of the filesystem. Clients connect to the namenode to perform filesystem operations; although, as we’ll see later, block data is streamed to and from datanodes directly, so bandwidth is not limited by a single node. Datanodes regularly report their status to the namenode in a heartbeat. This means that, at any given time, the namenode has a complete view of all datanodes in the cluster, their current health, and what blocks they have available. See Figure 2-1 for an example of HDFS architecture.

When a datanode initially starts up, as well as every hour thereafter, it sends what’s called a block report to the namenode. The block report is simply a list of all blocks the datanode currently has on its disks and allows the namenode to keep track of any changes. This is also necessary because, while the file to block mapping on the namenode is stored on disk, the locations of the blocks are not written to disk. This may seem counterintuitive at first, but it means a change in IP address or hostname of any of the datanodes does not impact the underlying storage of the filesystem metadata. Another nice side effect of this is that, should a datanode experience failure of a motherboard, administrators can simply remove its hard drives, place them into a new chassis, and start up the new machine. As far as the namenode is concerned, the blocks have simply moved to a new datanode. The downside is that, when initially starting a cluster (or restarting it, for that matter), the namenode must wait to receive block reports from all datanodes to know all blocks are present.

The namenode filesystem metadata is served entirely from RAM for fast lookup and retrieval, and thus places a cap on how much metadata the namenode can handle. A rough estimate is that the metadata for 1 million blocks occupies roughly 1 GB of heap (more on this in Hardware Selection). We’ll see later how you can overcome this limitation, even if it is encountered only at a very high scale (thousands of nodes).

Finally, the third HDFS process is called the secondary namenode and performs some internal housekeeping for the namenode. Despite its name, the secondary namenode is not a backup for the namenode and performs a completely different function.

Clients can read and write to HDFS using various tools and APIs (see Access and Integration), but all of them follow the same process. The client always, at some level, uses a Hadoop library that is aware of HDFS and its semantics. This library encapsulates most of the gory details related to communicating with the namenode and datanodes when necessary, as well as dealing with the numerous failure cases that can occur when working with a distributed filesystem.

First, let’s walk through the logic of performing an HDFS read operation. For this, we’ll assume there’s a file /user/esammer/foo.txt already in HDFS. In addition to using Hadoop’s client library—usually a Java JAR file—each client must also have a copy of the cluster configuration data that specifies the location of the namenode (see Chapter 5). As shown in Figure 2-2, the client begins by contacting the namenode, indicating which file it would like to read. The client identity is first validated—either by trusting the client and allowing it to specify a username or by using a strong authentication mechanism such as Kerberos (see Chapter 6)—and then checked against the owner and permissions of the file. If the file exists and the user has access to it, the namenode responds to the client with the first block ID and the list of datanodes on which a copy of the block can be found, sorted by their distance to the client. Distance to the client is measured according to Hadoop’s rack topology—configuration data that indicates which hosts are located in which racks. (More on rack topology configuration is available in Rack Topology.)

With the block IDs and datanode hostnames, the client can now contact the most appropriate datanode directly and read the block data it needs. This process repeats until all blocks in the file have been read or the client closes the file stream.

It is also possible that while reading from a datanode, the process or host on which it runs, dies. Rather than give up, the library will automatically attempt to read another replica of the data from another datanode. If all replicas are unavailable, the read operation fails and the client receives an exception. Another corner case that can occur is that the information returned by the namenode about block locations can be outdated by the time the client attempts to contact a datanode, in which case either a retry will occur if there are other replicas or the read will fail. While rare, these kinds of corner cases make troubleshooting a large distributed system such as Hadoop so complex. See Chapter 9 for a tour of what can go wrong and how to diagnose the problem.

Writing files to HDFS is a bit more complicated than performing reads. We’ll consider the simplest case where a client is creating a new file. Remember that clients need not actually implement this logic; this is simply an overview of how data is written to the cluster by the underlying Hadoop library. Application developers use (mostly) familiar APIs to open files, write to a stream, and close them similarly to how they would with traditional local files.

Initially, a client makes a request to open a named file for write using the Hadoop FileSystem APIs. A request is sent to the namenode to create the file metadata if the user has the necessary permissions to do so. The metadata entry for the new file is made; however, it initially has no associated blocks. A response to the client indicates the open request was successful and that it may now begin writing data. At the API level, a standard Java stream object is returned, although the implementation is HDFS-specific. As the client writes data to the stream it is split into packets (not to be confused with TCP packets or HDFS blocks), which are queued in memory. A separate thread in the client consumes packets from this queue and, as necessary, contacts the namenode requesting a set of datanodes to which replicas of the next block should be written. The client then makes a direct connection to the first datanode in the list, which makes a connection to the second, which connects to the third. This forms the replication pipeline to be used for this block of data, as shown in Figure 2-3. Data packets are then streamed to the first datanode, which writes the data to disk, and to the next datanode in the pipeline, which writes to its disk, and so on. Each datanode in the replication pipeline acknowledges each packet as it’s successfully written. The client application maintains a list of packets for which acknowledgments have not yet been received and when it receives a response, it knows the data has been written to all nodes in the pipeline. This process of writing packets to the pipeline continues until the block size is reached, at which point the client goes back to the namenode for the next set of datanodes to write to. Ultimately, the client indicates it’s finished sending data by closing the stream, which flushes any remaining packets out to disk and updates the namenode to indicate the file is now complete.

Of course, things are not always this simple, and failures can occur. The most common type of failure is that a datanode in the replication pipeline fails to write data for one reason or another—a disk dies or a datanode fails completely, for instance. When this happens, the pipeline is immediately closed and all packets that had been sent since the last acknowledgment are pushed back into the queue to be written so that any datanodes past the failed node in the pipeline will receive the data. The current block is given a new ID on the remaining healthy datanodes. This is done so that, should the failed datanode return, the abandoned block will appear to not belong to any file and be discarded automatically. A new replication pipeline containing the remaining datanodes is opened and the write resumes. At this point, things are mostly back to normal and the write operation continues until the file is closed. The namenode will notice that one of the blocks in the file is under-replicated and will arrange for a new replica to be created asynchronously. A client can recover from multiple failed datanodes provided at least a minimum number of replicas are written (by default, this is one).

The namenode stores its filesystem metadata on local filesystem disks in a few different files, the two most important of which are fsimage and edits. Just like a database would, fsimage contains a complete snapshot of the filesystem metadata whereas edits contains only incremental modifications made to the metadata. A common practice for high-throughput data stores, use of a write ahead log (WAL) such as the edits file reduces I/O operations to sequential, append-only operations (in the context of the namenode, since it serves directly from RAM), which avoids costly seek operations and yields better overall performance. Upon namenode startup, the fsimage file is loaded into RAM and any changes in the edits file are replayed, bringing the in-memory view of the filesystem up to date.

In more recent versions of Hadoop (specifically, Apache Hadoop 2.0 and CDH4; more on the different versions of Hadoop in Picking a Distribution and Version of Hadoop), the underlying metadata storage was updated to be more resilient to corruption and to support namenode high availability. Conceptually, metadata storage is similar, although transactions are no longer stored in a single edits file. Instead, the namenode periodically rolls the edits file (closes one file and opens a new file), numbering them by transaction ID. It’s also possible for the namenode to now retain old copies of both fsimage and edits to better support the ability to roll back in time. Most of these changes won’t impact you, although it helps to understand the purpose of the files on disk. That being said, you should never make direct changes to these files unless you really know what you are doing. The rest of this book will simply refer to these files using their base names, fsimage and edits, to refer generally to their function.

Recall from earlier that the namenode writes changes only to its write ahead log, edits. Over time, the edits file grows and grows and as with any log-based system such as this, would take a long time to replay in the event of server failure. Similar to a relational database, the edits file needs to be periodically applied to the fsimage file. The problem is that the namenode may not have the available resources—CPU or RAM—to do this while continuing to provide service to the cluster. This is where the secondary namenode comes in.

The exact interaction that occurs between the namenode and the secondary namenode (shown in Figure 2-4) is as follows:[3]

  1. The secondary namenode instructs the namenode to roll its edits file and begin writing to edits.new.

  2. The secondary namenode copies the namenode’s fsimage and edits files to its local checkpoint directory.

  3. The secondary namenode loads fsimage, replays edits on top of it, and writes a new, compacted fsimage file to disk.

  4. The secondary namenode sends the new fsimage file to the namenode, which adopts it.

  5. The namenode renames edits.new to edits.

This process occurs every hour (by default) or whenever the namenode’s edits file reaches 64 MB (also the default). There isn’t usually a good reason to modify this, although we’ll explore that later. Newer versions of Hadoop use a defined number of transactions rather than file size to determine when to perform a checkpoint.

As administrators responsible for the health and service of large-scale systems, the notion of a single point of failure should make us a bit uneasy (or worse). Unfortunately, for a long time the HDFS namenode was exactly that: a single point of failure. Recently, the Hadoop community as a whole has invested heavily in making the namenode highly available, opening Hadoop to additional mission-critical deployments.

Namenode high availability (or HA) is deployed as an active/passive pair of namenodes. The edits write ahead log needs to be available to both namenodes, and therefore is stored on a shared storage device. Currently, an NFS filer is required as the shared storage, although there are plans to remove this dependency.[4] As the active namenode writes to the edits log, the standby namenode is constantly replaying transactions to ensure it is up to date and ready to take over in the case of failure. Datanodes are also aware of both namenodes in an HA configuration and send block reports to both servers.

A high-availability pair of namenodes can be configured for manual or automatic failover. In the default manual failover mode, a command must be sent to effect a state transition from one namenode to the other. When configured for automatic failover, each namenode runs an additional process called a failover controller that monitors the health of the process and coordinates state transitions. Just as in other HA systems, there are two primary types of failover: graceful failover, initiated by an administrator, and nongraceful failover, which is the result of a detected fault in the active process. In either case, it’s impossible to truly know if a namenode has relinquished active status or if it’s simply inaccessible from the standby. If both processes were allowed to continue running, they could both write to the shared state and corrupt the filesystem metadata. This is commonly called a split brain scenario. For this reason, the system can use a series of increasingly drastic techniques to ensure the failed node (which could still think it’s active) is actually stopped. This can start with something as simple as asking it to stop via RPC, but can end with the mother of all fencing techniques: STONITH, or “shoot the other node in the head.” STONITH can be implemented by issuing a reboot via IPMI, or even by programmatically cutting power to a machine for a short period of time if data center power distribution units (PDUs) support such functionality. Most administrators who want high availability will also want to configure automatic failover as well. See Figure 2-5 for an example of automatic failover.

When running with high availability, the standby namenode takes over the role of the secondary namenode, described earlier. In other words, there is no separate secondary namenode process in an HA cluster, only a pair of namenode processes. Those that already run Hadoop clusters that have a dedicated machine on which they run the secondary namenode process can repurpose that machine to be a second namenode in most cases. The various configuration options for high availability are covered, in detail, in Namenode High Availability.

At the time of this writing, namenode high availability (sometimes abbreviated NN HA) is available in Apache Hadoop 2.0.0 and CDH4.

Large-scale users of Hadoop have had another obstacle with which to contend: the limit of how much metadata the namenode can store in memory. In order to scale the namenode beyond the amount of physical memory that could be stuffed into a single server, there needed to be a way to move from a scale-up to a scale-out approach. Just like we’ve seen with block storage in HDFS, it’s possible to spread the filesystem metadata over multiple machines. This technique is called namespace federation and refers to assembling one logical namespace from a number of autonomous systems. An example of a federated namespace is the Linux filesystem: many devices can be mounted at various points to form a single namespace that clients can address without concern for which underlying device actually contains the data.

Namenode federation (Figure 2-6) works around the memory limitation of the namenode by allowing the filesystem namespace to be broken up into slices and spread across multiple namenodes. Just as it sounds, this is really just like running a number of separate namenodes, each of which is responsible for a different part of the directory structure. The one major way in which namenode federation is different from running several discreet clusters is that each datanode stores blocks for multiple namenodes. More precisely, each datanode has a block pool for each namespace. While blocks from different pools are stored on the same disks (there is no physical separation), they are logically exclusive. Each datanode sends heartbeats and block reports to each namenode.

Clients often do not want to have to worry about multiple namenodes, so a special client API implementation called ViewFS can be used that maps slices of the filesystem to the proper namenode. This is, conceptually, almost identical to the Linux /etc/fstab file, except that rather than mapping paths to physical devices, ViewFS maps paths to HDFS namenodes. For instance, we can configure ViewFS to look at namenode1 for path /logs and namenode2 for path /hbase. Federation also allows us to use namespace partitioning to control the availability and fault tolerance of different slices of the filesystem. In our previous example, /hbase could be on a namenode that requires extremely high uptime while maybe /logs is used only by batch operations in MapReduce.

Lastly, it’s important to note that HA and federation are orthogonal features. That is, it is possible to enable them independently of each other, as they speak to two different problems. This means a namespace can be partitioned and some of those partitions (or all) may be served by an HA pair of namenodes.

The sole native method of access to HDFS is its Java API. All other access methods are built on top of this API and by definition, can expose only as much functionality as it permits. In an effort to ease adoption and development of applications, the HDFS API is simple and familiar to developers, piggybacking on concepts such as Java’s I/O streams. The API does differ where necessary in order to provide the features and guarantees it advertises, but most of these are obvious or documented.

In order to access HDFS, clients—applications that are written against the API—must have a copy of configuration data that tells them where the namenode is running. This is analogous to an Oracle client application requiring the tnsnames.ora file. Each application must also have access to the Hadoop library JAR file. Again, this is the equivalent of a database client application’s dependence on a JDBC driver JAR. Clients can be on the same physical machines as any of the Hadoop daemons, or they can be separate from the cluster proper. MapReduce tasks and HBase Region Servers, for example, access HDFS as any other normal client would. They just happen to be running on the same physical machines where HDFS stores its block data.

It’s important to realize that, as a consequence of the direct client to datanode communication, network access between clients and all cluster nodes’ relevant ports must be unfettered. This has implications on network design, security, and bandwidth that are covered in Network Design.

Hadoop comes with a number of command-line tools that enable basic filesystem operations. Like all Hadoop tools, HDFS commands are subcommands of the hadoop command-line utility. Running hadoop fs will display basic usage information, as shown in Example 2-1.

Most of these commands will be immediately obvious to an administrator with basic shell experience. The major difference is that, because HDFS is a user space filesystem, there’s no concept of a current working directory. All paths are either absolute (recommended) or relative to the user’s home directory within HDFS.[5] An absolute path can be of the form /logs/2012/01/25/, or it can include the full URL to specify the location of the namenode, such as hdfs://mynamenode.mycompany.com:8020/logs/2012/01/25/. If the full URL syntax is not used, the value is taken from the fs.default.name parameter in the core-site.xml configuration file (see Example 2-2).

To prove to ourselves that the HDFS namespace is entirely separate from the host OS, we can attempt to list the same path using the standard ls command (see Example 2-3).

In many ways, HDFS is more like a remote filesystem than a local OS filesystem. The act of copying files to or from HDFS is more like SCP or FTP than working with an NFS mounted filesystem, for example. Files are uploaded using either -put or the synonym -copyFromLocal and are downloaded with -get or -copyToLocal. As a convenience, the -moveFromLocal and -moveToLocal commands will copy a file from or to HDFS, respectively, and then remove the source file (see Example 2-4).

Also unique to HDFS is the ability to set the replication factor of a file. This can be done by using the -setrep command, which takes a replication factor and an optional flag (-R) to indicate it should operate recursively (see Example 2-5).

In Example 2-5, we’ve changed the replication factor of files a and b in the tmp directory to 5. Next, the fsck, which is covered in Checking Filesystem Integrity with fsck, is used to inspect file health but has the nice side effect of displaying block location information for each file. Here, the five replicas of each block are spread over five different datanodes in the cluster, as expected. You may notice that only files have a block list. Directories in HDFS are purely metadata entries and have no block data.

Over the past few years, Representational State Transfer (REST) has become an increasingly popular way to interact with services in a language-agnostic way. Hadoop’s native APIs are all Java-based, which presents a problem for non-Java clients. Applications have the option of shelling out and using the hadoop fs command, but that’s inefficient and error-prone (not to mention aesthetically displeasing). Starting with Apache Hadoop 1.0.0 and CDH4, WebHDFS, a RESTful API to HDFS, is now a standard part of the software. WebHDFS makes use of the already embedded web server in each Hadoop HDFS daemon to run a set of REST APIs that mimic that of the Java FileSystem API, including read and write methods. Full authentication, including Kerberos SPNEGO, is supported by WebHDFS. See Example 2-6 for a sample invocation of the WebHDFS equivalent of the hadoop fs -ls /hbase command.

Around the same time, a standalone RESTful HDFS proxy service was created, called HttpFS. While at first glance, both WebHDFS and HttpFS solve the same problem—in fact, HttpFS is 100% API-compatible with WebHDFS—they address two separate architectual problems. By using the embedded web server in each daemon, WebHDFS clients must be able to communicate with each node of the cluster, just like native Java clients. HttpFS primarily exists to solve this problem and instead acts as a gateway service that can span network segments. Clients require only connectivity to the HttpFS daemon, which in turn performs all communication with the HDFS cluster using the standard Java APIs. The upside to HttpFS is that it minimizes the footprint required to communicate with the cluster, but at the cost of total scale and capacity because all data between clients and HDFS must now travel through a single node. Of course, it is perfectly fine to run multiple HttpFS proxies to overcome this problem. Further, because both WebHDFS and HttpFS are fully API-compatible, developers writing client applications need to concern themselves with these details. The decision can be one based exclusively on the required data throughput and network design and security requirements.



[3] This process is slightly different for Apache Hadoop 2.0 and CDH4, but it is conceptually the equivalent.

[4] See Apache JIRA HDFS-3077.

[5] User home directories in HDFS are located in /user/<username> by default.