Tag Archives: Big Data

How-to: Manage Permissions in Hue

Hue is a web interface for Apache Hadoop that makes common Hadoop tasks such as running MapReduce jobs, browsing HDFS, and creating Apache Oozie workflows, easier. (To learn more about the integration of Oozie and Hue, see this blog post.) In this post, we’re going to focus on how one of the fundamental components in Hue, Useradmin, has matured.

New User and Permission Features

User and permission management in Hue has changed drastically over the past year. Oozie workflows, Apache Hive queries, and MapReduce jobs can be shared with other users or kept private. Permissions exist at the app level. Access to particular apps can be restricted, as well as certain sections of the apps. For instance, access to the shell app can be restricted, as well as access to the Apache HBaseApache Pig, and Apache Flume shells themselves. Access privileges are defined for groups and users can be members of one or more groups.

Changes to Users, Groups, and Permissions

Hue now supports authentication against PAM, Spnego, and an LDAP server. Users and groups can be imported from LDAP and be treated like their non-external counterparts. The import is manual and is on a per user/group basis. Users can authenticate using different backends such as LDAP. Using the LDAP authentication backend will allow users to login using their LDAP password. This can be configured in /etc/hue/hue.ini by changing the ‘desktop.auth.backend’ setting to ‘desktop.auth.backend.LdapBackend’. The LDAP server to authenticate against can be configured through the settings under ‘desktop.ldap’.

Here’s an example:

A company would like to use the following LDAP users and groups in Hue:

  1. John Smith belonging to team A
  2. Helen Taylor belonging to team B

Assuming the following access requirements:

  1. Team A should be able to use Beeswax, but nothing else.
  2. Team B should only be able to see the Oozie dashboard with readonly permissions.

In Hue 1 the scenarios cannot be realistically addressed given the lack of groups.

In Hue 2 the scenarios can be addressed more appropriately. Users can be imported from LDAP by clicking “Add/Sync LDAP user” in Useradmin > Users:

Similarly, groups can be imported from LDAP by clicking “Add/Sync LDAP group” in Useradmin > Groups.

If a previously imported user’s information was updated recently, the information in Hue will need to be resynchronized. This can be achieved through the LDAP sync feature:

Part A of the example can be addressed by explicitly allowing access Beeswax for Team A. This is managed in the “Groups” tab of the Useradmin app:

The Team A group can be edited by clicking on its name, where access privileges for the group are selectable. Here, the “beeswax.access” permission would be selected and the others would be unselected:

Part B of the example can be handled by explicitly defining access for Team B. This can be accomplished by following the same steps in part A, except for Team B. Every permission would be unselected except “oozie.dashboard_jobs_access”:

By explicitly setting the app level permissions, the apps that these users will be able to see will change. For instance, Helen, who is a member of Team B, will only see the Oozie app available:

Summary

User management has been revamped, groups were added, and various backends are exposed. One such backend, LDAP, facilitates synchronization of users and groups. App-level permissions allow administrators to control who can access certain apps and what documents can be shared.

Hue is maturing quickly and many more features are on their way. Hue will soon have document-level permissions (workflows, queries, and so on), trash functionality, and improvements to the existing editors.

Have any suggestions? Feel free to tell us what you think through hue-user.

How-to: Analyze Twitter Data with Hue(http://gethue.com/how-to-analyze-twitter-data-with-hue/)

Hue 2.2 , the open source web-based interface that makes Apache Hadoop easier to use, lets you interact with Hadoop services from within your browser without having to go to a command-line interface. It features different applications like an Apache Hive editor and Apache Oozie dashboard and workflow builder.

This post is based on our “Analyzing Twitter Data with Hadoop” sample app and details how the same results can be achieved through Hue in a simpler way. Moreover, all the code and examples of the previous series have been updated to the recent CDH4.2 release.

Collecting Data

The first step is to create the “flume” user and his home on the HDFS where the data will be stored. This can be done via the User Admin application.

The second step consists of collecting some tweet data from the live Twitter stream.

Apache Flume is an elegant solution for taking care of this. The configuration of Flume is detailed in the readme and previous blog post. However, if you want to skip this step, some data is available on GitHub. Just upload it as a zip file in the home directory of the flume user and the “tweets” directory will show up after a few seconds.

If you are not taking this shortcut, create the tweets directory in the File Browser with the New Folder action.

Then, when the Flume agent is started, the data will start appearing:

Clicking on a file will display its content in the built-in viewer:

Preparing Hive

It is time to prepare the analysis of the tweet data. We’ll use Apache Hive, which can query the data with SQL-like syntax in a scalable way. The detailed description of the Hive setup is detailed in the readme.

When Hive is ready, the tweet table can be created in the query editor of Beeswax. Notice that the Hive SerDe (to download or compile here) must be included as a jar in the query. You can read more about Hive SerDe in this previous post.

To do this, just click on “Add” > “File Resources”, click on the path chooser button, click on the “Home” button, and upload hive-serdes-1.0-SNAPSHOT.jar.

Then just enter the CREATE TABLE statement and execute it:

CREATE EXTERNAL TABLE tweets (
  id BIGINT,
  created_at STRING,
  source STRING,
  favorited BOOLEAN,
  retweet_count INT,
  retweeted_status STRUCT<
    text:STRING,
    user:STRUCT<screen_name:STRING,name:STRING>>,
  entities STRUCT<
    urls:ARRAY<STRUCT<expanded_url:STRING>>,
    user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
    hashtags:ARRAY<STRUCT<text:STRING>>>,

  text STRING,
  user STRUCT<
    screen_name:STRING,
    name:STRING,
    friends_count:INT,
    followers_count:INT,
    statuses_count:INT,
    verified:BOOLEAN,
    utc_offset:INT,
    time_zone:STRING>,
  in_reply_to_screen_name STRING
)
PARTITIONED BY (datehour INT)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/flume/tweets'

Now that the table is created, let’s insert some data in the table. First, select the table in the “Table” tab and click “Import data”. Enter the path “/user/flume/tweets/2013/02/25/17″ and “201302251″ as the key:

Depending on the partition picked, a query similar to this will be generated:

LOAD DATA INPATH '/user/flume/tweets/2013/02/25/16'
INTO TABLE `default.tweets`
PARTITION (datehour='2013022516')

After the query executes, the table ‘tweets’ will be available.

Beeswax can access the Hive metastore and its list of tables. A description of their schema and partitions with some example of data contained in each table are helpful while designing your queries. Moreover, a wizard can guide you step-by-step to create new tables.

Analysis with Beeswax

It becomes now possible to perform some SELECT queries on the data. Here is an example below but most of interesting ones are described in Parts 1 and 3 of the “Analyzing Twitter with Hadoop” series.

SELECT
    t.retweeted_screen_name,
    sum(retweets) AS total_retweets,
    count(*) AS tweet_count
  FROM (SELECT
          retweeted_status.user.screen_name as retweeted_screen_name,
              retweeted_status.text,
              max(retweet_count) as retweets
        FROM tweets
        GROUP BY retweeted_status.user.screen_name,
                 retweeted_status.text) t
  GROUP BY t.retweeted_screen_name
  ORDER BY total_retweets DESC
  LIMIT 10;

Beeswax possesses multiple features for providing a better user experience than the command line shell. For example you can save queries and share them with other users. The result of a query can be exported into a new table or an HDFS file or downloaded to your desktop. Some other good examples are:

  • Ajax refresh of the logs
  • Quick column navigation on the result page
  • MapReduce jobs listing with a direct access to their logs
  • ‘Email me on completion’ setting
  • Multi-database support

Example of the screen while running query:

Seeing the result of the query:

Note: if your queries are failing and you are seeing an error like below, it means that you forgot to add the ‘/user/flume/hive-serdes-1.0-SNAPSHOT.jar’ to the query:

FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask

Conclusion

In this post we focused on how the Beeswax application can make it easy to execute Hive queries. New features such as multi-query (HUE-159), autocomplete, and syntax highlighting (HUE-1063) are going to improve the usability even more.

The next article in this series will elaborate on this topic and describe how Hue’s Apache Oozie application can be used for scheduling Hive queries in a few clicks.

Managing a Hadoop Cluster

 

  1. Introduction
  2. Goals for this Module
  3. Outline
  4. Basic Setup
    1. Java Requirements
    2. Operating System
    3. Downloading and Installing Hadoop
  5. Important Directories
  6. Selecting Machines
  7. Cluster Configurations
    1. Small Clusters: 2-10 Nodes
    2. Medium Clusters: 10-40 Nodes
    3. Large Clusters: Multiple Racks
  8. Performance Monitoring
    1. Ganglia
    2. Nagios
  9. Additional Tips
  10. References & Resources

Basic Setup

This section discusses the general platform requirements for Hadoop.

JAVA REQUIREMENTS

Hadoop is a Java-based system. Recent versions of Hadoop require Sun Java 1.6.

Compiling Java programs to run on Hadoop can be done with any number of commonly-used Java compilers. Sun’s compiler is fine, as is ecj, the Eclipse Compiler for Java. A bug in gcj, the GNU Compiler for Java, causes incompatibility between generated classes and Hadoop; it should not be used.

OPERATING SYSTEM

As Hadoop is written in Java, it is mostly portable between different operating systems. Developers can and do run Hadoop under Windows. The various scripts used to manage Hadoop clusters are written in a UNIX shell scripting language that assumes sh– or bash-like behavior. Thus running Hadoop under Windows requires cygwin to be installed. The Hadoop documentation stresses that a Windows/cygwin installation is for development only. The vast majority of server deployments today are on Linux. (Other POSIX-style operating systems such as BSD may also work. Some Hadoop users have reported successfully running the system on Solaris.) The instructions on this page assume a command syntax and system design similar to Linux, but can be readily adapted to other systems.

DOWNLOADING AND INSTALLING HADOOP

Hadoop is available for download from the project homepage athttp://hadoop.apache.org/core/releases.html. Here you will find several versions of Hadoop available.

The versioning strategy used is major.minor.revision. Increments to the major version number represent large differences in operation or interface and possibly significant incompatible changes. At the time of this writing (September 2008), there have been no major upgrades; all Hadoop versions have their major version set to 0. The minor version represents a large set of feature improvements and enhancements. Hadoop instances with different minor versions may use different versions of the HDFS file formats and protocols, requiring a DFS upgrade to migrate from one to the next. Revisions are used to provide bug fixes. Within a minor version, the most recent revision contains the most stable patches.

Within the releases page, two or three versions of Hadoop will be readily available, corresponding to the highest revision number in the most recent two or three minor version increments. The stable version is the highest revision number in the second most recent minor version. Production clusters should use this version. The most recent minor version may include improved performance or new features, but may also introduce regressions that will be fixed in ensuing revisions.

At the time of this writing, 0.18.0 is the most recent version, with 0.17.2 being the “stable” release. These example instructions assume that version 0.18.0 is being used; the directions will not change significantly for any other version, except by substituting the new version number where appropriate.

To install Hadoop, first download and install prerequisite software. This includes Java 6 or higher. Distributed operation requires ssh and sshd. Windows users must install and configure cygwin as well. Then download a Hadoop version using a web browser, wget, or curl, and then unzip the package:

gunzip hadoop-0.18.0.tar.gz
tar vxf hadoop-0.18.0.tar

Within the hadoop-0.18.0/ directory which results, there will be several subdirectories. The most interesting of these are bin/, where scripts to run the cluster are located, and conf/ where the cluster’s configuration is stored.

Enter the conf/ directory and modify hadoop-env.sh. The JAVA_HOME variable must be set to the base directory of your Java installation. It is recommended that you install Java in the same location on all machines in the cluster, so this file can be replicated to each machine without modification.

The hadoop-site.xml file must also be modified to contain a number of configuration settings. The sections below address the settings which should be included here.

If you are interested in setting up a development installation, running Hadoop on a single machine, the Hadoop documentation includes getting started instructions which will configure Hadoop for standalone or “pseudo-distributed” operation.

Standalone installations run all of Hadoop and your application inside a single Java process. The distributed file system is not used; file are read from and written to the local file system. Such a setup can be helpful for debugging Hadoop applications.

Pseudo-distributed operation refers to the use of several separate processes representing the different daemons (NameNode, DataNode, JobTracker, TaskTracker) and a separate task process to perform a Hadoop job, but with all processes running on a single machine. A pseudo-distributed instance will have a functioning NameNode/DataNode managing a “DFS” of sorts. Files in HDFS are in a separate namespace from the local file system, and are stored as block objects in a Hadoop-managed directory. However, it is not truly distributed, as no processing or data storage is performed on remote notes. A pseudo-distributed instance can be extended into a fully distributed cluster by adding more machines to function as Task/DataNodes, but more configuration settings are usually required to deploy a Hadoop cluster for multiple users.

The rest of this document deals with configuring Hadoop clusters of multiple nodes, intended for use by one or more developers.

After the conf/hadoop-site.xml is configured according to one of the models in the getting started, the sections below, or your own settings, two more files must be written.

The conf/masters file contains the hostname of the SecondaryNameNode. This should be changed from “localhost” to the fully-qualified domain name of the node to run the SecondaryNameNode service. It does not need to contain the hostname of the JobTracker/NameNode machine; that service is instantiated on whichever node is used to run bin/start-all.sh, regardless of the masters file. The conf/slaves file should contain the hostname of every machine in the cluster which should start TaskTracker and DataNode daemons. One hostname should be written per line in each of these files, e.g.:

slave01
slave02
slave03
...

The master node does not usually also function as a slave node, except in installations across only 1 or 2 machines.

If the nodes on your cluster do not support passwordless ssh, you should configure this now:

$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

This will enable passwordless ssh login to the local machine. (You can verify that this works by executingssh localhost.) The ~/.ssh/id_dsa.pub and authorized_keys files should be replicated on all machines in the cluster.

At this point, the configuration must be replicated across all nodes in the cluster. Small clusters may use rsync or copy the configuration directory to each node. Larger clusters should use a configuration management system such as bcfg2, smartfrog, or puppet. NFS should be avoided as much as is possible, as it is a scalability bottleneck. DataNodes should never share block storage or other high-bandwidth responsibilities over NFS, and should avoid sharing configuration information over NFS if possible.

Various directories should be created on each node. The NameNode requires the NameNode metadata directory:

$ mkdir -p /home/hadoop/dfs/name

And every node needs the Hadoop tmp directory and DataNode directory created. Rather than logging in to each node and performing the steps multiple times manually, the file bin/slaves.sh allows a command to be executed on all nodes in the slaves file. For example, we can create these directories by executing the following commands on the NameNode:

$ mkdir -p /tmp/hadoop  # make the NameNode's tmp dir
$ export HADOOP_CONF_DIR=${HADOOP_HOME}/conf
$ export HADOOP_SLAVES=${HADOOP_CONF_DIR}/slaves
$ ${HADOOP_HOME}/bin/slaves.sh "mkdir -p /tmp/hadoop"
$ ${HADOOP_HOME}/bin/slaves.sh "mkdir -p /home/hadoop/dfs/data"

The environment variables $HADOOP_CONF_DIR and $HADOOP_SLAVES are used by the bin/slaves.shscript to find the slave machines list. The provided command is then executed over ssh. If you need particular ssh options, the contents of the $HADOOP_SSH_OPTS variable are passed to ssh as arguments.

We then format HDFS by executing the following command on the NameNode:

$ bin/hadoop namenode -format

And finally, start the cluster:

$ bin/start-all.sh

Now it is time to load in data and start processing it with Hadoop! Good luck!

The remainder of this document discusses various trade-offs in cluster configurations for different sizes, and reviews the settings which may be placed in the hadoop-site.xml file.

Important Directories

One of the basic tasks involved in setting up a Hadoop cluster is determining where the several various Hadoop-related directories will be located. Where they go is up to you; in some cases, the default locations are inadvisable and should be changed. This section identifies these directories.

Directory Description Default location Suggested location
HADOOP_LOG_DIR Output location for log files from daemons ${HADOOP_HOME}/logs /var/log/hadoop
hadoop.tmp.dir A base for other temporary directories /tmp/hadoop-${user.name} /tmp/hadoop
dfs.name.dir Where the NameNode metadata should be stored ${hadoop.tmp.dir}/dfs/name /home/hadoop/dfs/name
dfs.data.dir Where DataNodes store their blocks ${hadoop.tmp.dir}/dfs/data /home/hadoop/dfs/data
mapred.system.dir The in-HDFS path to shared MapReduce system files ${hadoop.tmp.dir}/mapred/system /hadoop/mapred/system

This table is not exhaustive; several other directories are listed in conf/hadoop-defaults.xml. The remaining directories, however, are initialized by default to reside under hadoop.tmp.dir, and are unlikely to be a concern.

It is critically important in a real cluster that dfs.name.dir and dfs.data.dir be moved out fromhadoop.tmp.dir. A real cluster should never consider these directories temporary, as they are where all persistent HDFS data resides. Production clusters should have two paths listed for dfs.name.dir which are on two different physical file systems, to ensure that cluster metadata is preserved in the event of hardware failure.

A multi-user configuration should also definitely adjust mapred.system.dir. Hadoop’s default installation is designed to work for standalone operation, which does not use HDFS. Thus it conflates HDFS and local file system paths. When enabling HDFS, however, MapReduce will store shared information about jobs in mapred.system.dir on the DFS. If this path includes the current username (as the defaulthadoop.tmp.dir does), this will prevent proper operation. The current username on the submitting node will be the username who actually submits the job, e.g., “alex.” All other nodes will have the current username set to the username used to launch Hadoop itself (e.g., “hadoop”). If these do not match, the TaskTrackers will be unable to find the job information and run the MapReduce job.

For this reason, it is also advisable to remove ${user.name} from the general hadoop.tmp.dir.

While most of the directories listed above (all the ones with names in “foo.bar.baz” form) can be relocated via the conf/hadoop-site.xml file, the HADOOP_LOG_DIR directory is specified in conf/hadoop-env.sh as an environment variable. Relocating this directory requires editing this script.

Selecting Machines

Before diving into the details of configuring nodes, we include a brief word on choosing hardware for a cluster. While the processing demands of different organizations will dictate a different machine configuration for optimum efficiency, there are are commonalities associated with most Hadoop-based tasks.

Hadoop is designed to take advantage of whatever hardware is available. Modest “beige box” PCs can be used to run small Hadoop setups for experimentation and debugging. Providing greater computational resources will, to a point, result in increased performance by your Hadoop cluster. Many existing Hadoop deployments include Xeon processors in the 1.8-2.0GHz range. Hadoop jobs written in Java can consume between 1 and 2 GB of RAM per core. If you use HadoopStreaming to write your jobs in a scripting language such as Python, more memory may be advisable. Due to the I/O-bound nature of Hadoop, adding higher-clocked CPUs may not be the most efficient use of resources, unless the intent is to run HadoopStreaming. Big data clusters, of course, can use as many large and fast hard drives as are available. However, too many disks in a single machine will result in many disks not being used in parallel. It is better to have three machines with 4 hard disks each than one machine with 12 drives. The former configuration will be able to write to more drives in parallel and will provide greater throughput. Finally, gigabit Ethernet connections between machines will greatly improve performance over a cluster connected via a slower network interface.

It should be noted that the lower limit on minimum requirements for running Hadoop is well below the specifications for modern desktop or server class machines. However, multiple pages on the Hadoop wiki suggest similar specifications to those posted here for high-performance cluster design. (See [1],  [2].)

Cluster Configurations

This section provides cluster configuration advice and specific settings for clusters of varying sizes. These sizes were picked to demonstrate basic categories of clusters; your own installation may be a hybrid of different aspects of these profiles. Here we suggest various properties which should be included in theconf/hadoop-site.xml file to most effectively use a cluster of a given size, as well as other system configuration elements. The next section describes how to finish the installation after implementing the configurations described here. You should read through each of these configurations in order, as configuration suggestions for larger deployments are based on the preceding ones.

SMALL CLUSTERS: 2-10 NODES

Setting up a small cluster for development purposes is a very straightforward task. When using two nodes, one node will act as both NameNode/JobTracker and a DataNode/TaskTracker; the other node is only a DataNode/TaskTracker. Clusters of three or more machines typically use a dedicated NameNode/JobTracker, and all other nodes are workers.

A relatively minimalist configuration in conf/hadoop-site.xml will suffice for this installation:

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>head.server.node.com:9001</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://head.server.node.com:9000</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/home/hadoop/dfs/data</value>
<final>true</final>
</property>
<property>
<name>dfs.name.dir</name>
<value>/home/hadoop/dfs/name</value>
<final>true</final>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop</value>
<final>true</final>
</property>
<property>
<name>mapred.system.dir</name>
<value>/hadoop/mapred/system</value>
<final>true</final>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>

Clusters closer to the 8-10 node range may want to set dfs.replication to 3. Values higher than 3 are usually not necessary. Individual files which are heavily utilized by a large number of nodes may have their particular replication factor manually adjusted upward independent of the cluster default.

MEDIUM CLUSTERS: 10-40 NODES

This category is for clusters that occupy the majority of a single rack. Additional considerations for high availability and reliability come into play at this level.

The single point of failure in a Hadoop cluster is the NameNode. While the loss of any other machine (intermittently or permanently) does not result in data loss, NameNode loss results in cluster unavailability. The permanent loss of NameNode data would render the cluster’s HDFS inoperable.

Therefore, another step should be taken in this configuration to back up the NameNode metadata. One machine in the cluster should be designated as the NameNode’s backup. This machine does not run the normal Hadoop daemons (i.e., the DataNode and TaskTracker). Instead, it exposes a directory via NFS which is only mounted on the NameNode (e.g., /mnt/namenode-backup/). The cluster’s hadoop-site.xml file should then instruct the NameNode to write to this directory as well:

  <property>
<name>dfs.name.dir</name>
<value>/home/hadoop/dfs/name,/mnt/namenode-backup</value>
<final>true</final>
</property>

The NameNode will write its metadata to each directory in the comma-separated list of dfs.name.dir. If/mnt/namenode-backup is NFS-mounted from the backup machine, this will ensure that a redundant copy of HDFS metadata is available. The backup node should serve /mnt/namenode-backup from/home/hadoop/dfs/name on its own drive. This way, if the NameNode hardware completely dies, the backup machine can be brought up as the NameNode with no reconfiguration of the backup machine’s software. To switch the NameNode and backup nodes, the backup machine should have its IP address changed to the original NameNode’s IP address, and the server daemons should be started on that machine. The IP address must be changed to allow the DataNodes to recognize it as the “original” NameNode for HDFS. (Individual DataNodes will cache the DNS entry associated with the NameNode, so just changing the hostname is insufficient; the name reassignment must be performed at the IP address level.)

The backup machine still has Hadoop installed and configured on it in the same way as every other node in the cluster, but it is not listed in the slaves file, so normal daemons are not started there.

One function that the backup machine can be used for is to serve as the SecondaryNameNode. Note that this is not a failover NameNode process. The SecondaryNameNode process connects to the NameNode and takes periodic snapshots of its metadata (though not in real time). The NameNode metadata consists of a snapshot of the file system called the fsimage and a series of deltas to this snapshot called theeditlog. With these two files, the current state of the system can be determined exactly. The SecondaryNameNode merges the fsimage and editlog into a new fsimage file that is a more compact representation of the file system state. Because this process can be memory intensive, running it on the backup machine (instead of on the NameNode itself) can be advantageous.

To configure the SecondaryNameNode daemon to run on the backup machine instead of on the master machine, edit the conf/masters file so that it contains the name of the backup machine. Thebin/start-dfs.sh and bin/start-mapred.sh (and by extension, bin/start-all.sh) scripts will actually always start the master daemons (NameNode and JobTracker) on the local machine. The slavesfile is used for starting DataNodes and TaskTrackers. The masters file is used for starting the SecondaryNameNode. This filename is used despite the fact that the master node may not be listed in the file itself.

A cluster of this size may also require nodes to be periodically decommissioned. As noted in Module 2, several machines cannot be turned off simultaneously, or data loss may occur. Nodes must be decommissioned on a schedule that permits replication of blocks being decommissioned. To prepare for this eventuality in advance, an excludes file should be added to the conf/hadoop-site.xml:

  <property>
<name>dfs.hosts.exclude</name>
<value>/home/hadoop/excludes</value>
<final>true</final>
</property>
<property>
<name>mapred.hosts.exclude</name>
<value>/home/hadoop/excludes</value>
<final>true</final>
</property>

This property should provide the full path to the excludes file (the actual location of the file is up to you). You should then create an empty file with this name:

$ touch /home/hadoop/excludes

While the dfs.hosts.exclude property allows the definition of a list of machines which are explicitly barred from connecting to the NameNode (and similarly, mapred.hosts.exclude for the JobTracker), a large cluster may want to explicitly manage a list of machines which are approved to connect to a given JobTracker or NameNode.

The dfs.hosts and mapred.hosts properties allow an administrator to supply a file containing an approved list of hostnames. If a machine is not in this list, it will be denied access to the cluster. This can be used to enforce policies regarding which teams of developers have access to which MapReduce sub-clusters. These are configured in exactly the same way as the excludes file.

Of course, at this scale and above, 3 replicas of each block are advisable; the hadoop-site.xml file should contain:

  <property>
<name>dfs.replication</name>
<value>3</value>
</property>

By default, HDFS does not preserve any free space on the DataNodes; the DataNode service will continue to accept blocks until all free space on the disk is exhausted, which may cause problems. The following setting will require each DataNode to reserve at least 1 GB of space on the drive free before it writes more blocks, which helps preserve system stability:

  <property>
<name>dfs.datanode.du.reserved</name>
<value>1073741824</value>
<final>true</final>
</property>

Another parameter to watch is the heap size associated with each task. Hadoop caps the heap of each task process at 200 MB, which is too small for most data processing tasks. This cap is set as a parameter passed to the child Java process. It is common to override this with a higher cap by specifying:

  <property>
<name>mapred.child.java.opts</name>
<value>-Xmx512m</value>
</property>

This will provide each child task with 512 MB of heap. It is not unreasonable in some cases to specify -Xmx1024m instead. In the interest of providing only what is actually required, it may be better to leave this set to 512 MB by default, and allowing applications to manually configure for a full GB of RAM/task themselves.

Using multiple drives per machine

While small clusters often have only one hard drive per machine, more high-performance configurations may include two or more disks per node. Slight configuration changes are required to make Hadoop take advantage of additional disks.

DataNodes can be configured to write blocks out to multiple disks via the dfs.data.dir property. It can take on a comma-separated list of directories. Each block is written to one of these directories. E.g., assuming that there are four disks, mounted on /d1/d2/d3, and /d4, the following (or something like it) should be in the configuration for each DataNode:

  <property>
<name>dfs.data.dir</name>
<value>/d1/dfs/data,/d2/dfs/data,/d3/dfs/data,/d4/dfs/data</value>
<final>true</final>
</property>

MapReduce performance can also be improved by distributing the temporary data generated by MapReduce tasks across multiple disks on each machine:

  <property>
<name>mapred.local.dir</name>
<value>/d1/mapred/local,/d2/mapred/local,/d3/mapred/local,/d4/mapred/local</value>
<final>true</final>
</property>

Finally, if there are multiple drives available in the NameNode, they can be used to provide additional redundant copies of the NameNode metadata in the event of the failure of one drive. Unlike the above two properties, where one drive out of many is selected to write a piece of data, the NameNode writes to eachcomma-separated path in dfs.name.dir. If too many drives are listed here it may adversely affect the performance of the NameNode, as the probability of blocking on one or more I/O operations increases with the number of devices involved, but it is imperative that the sole copy of the metadata does not reside on a single drive.

LARGE CLUSTERS: MULTIPLE RACKS

Configuring multiple racks of machines for Hadoop requires further advance planning. The possibility of rack failure now exists, and operational racks should be able to continue even if entire other racks are disabled. Naive setups may result in large cross-rack data transfers which adversely affect performance. Furthermore, in a large cluster, the amount of metadata under the care of the NameNode increases. This section proposes configuring several properties to help Hadoop operate at very large scale, but the numbers used in this section are just guidelines. There is no single magic number which works for all deployments, and individual tuning will be necessary. These will, however, provide a starting point and alert you to settings which will be important.

The NameNode is responsible for managing metadata associated with each block in the HDFS. As the amount of information in the rack scales into the 10’s or 100’s of TB, this can grow to be quite sizable. The NameNode machine needs to keep the blockmap in RAM to work efficiently. Therefore, at large scale, this machine will require more RAM than other machines in the cluster. The amount of metadata can also be dropped almost in half by doubling the block size:

  <property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>

This changes the block size from 64MB (the default) to 128MB, which decreases pressure on the NameNode’s memory. On the other hand, this potentially decreases the amount of parallelism that can be achieved, as the number of blocks per file decreases. This means fewer hosts may have sections of a file to offer to MapReduce tasks without contending for disk access. The larger the individual files involved (or the more files involved in the average MapReduce job), the less of an issue this is.

In the medium configuration, the NameNode wrote HDFS metadata through to another machine on the rack via NFS. It also used that same machine to checkpoint the NameNode metadata and compact it in the SecondaryNameNode process. Using this same setup will result in the cluster being dependent on a single rack’s continued operation. The NFS-mounted write-through backup should be placed in a different rack from the NameNode, to ensure that the metadata for the file system survives the failure of an individual rack. For the same reason, the SecondaryNameNode should be instantiated on a separate rack as well.

With multiple racks of servers, RPC timeouts may become more frequent. The NameNode takes a continual census of DataNodes and their health via heartbeat messages sent every few seconds. A similar timeout mechanism exists on the MapReduce side with the JobTracker. With many racks of machines, they may force one another to timeout because the master node is not handling them fast enough. The following options increase the number of threads on the master machine dedicated to handling RPC’s from slave nodes:

  <property>
<name>dfs.namenode.handler.count</name>
<value>40</value>
</property>
<property>
<name>mapred.job.tracker.handler.count</name>
<value>40</value>
</property>

These settings were used in clusters of several hundred nodes. They should be scaled up accordingly with larger deployments.

The following settings provide additional starting points for optimization. These are based on the reported configurations of actual clusters from 250 to 2000 nodes.

Property Range Description
io.file.buffer.size 32768-131072 Read/write buffer size used in SequenceFiles (should be in multiples of the hardware page size)
io.sort.factor 50-200 Number of streams to merge concurrently when sorting files during shuffling
io.sort.mb 50-200 Amount of memory to use while sorting data
mapred.reduce.parallel.copies 20-50 Number of concurrent connections a reducer should use when fetching its input from mappers
tasktracker.http.threads 40-50 Number of threads each TaskTracker uses to provide intermediate map output to reducers
mapred.tasktracker.map.tasks.maximum 1/2 * (cores/node) to 2 * (cores/node) Number of map tasks to deploy on each machine.
mapred.tasktracker.reduce.tasks.maximum 1/2 * (cores/node) to 2 * (cores/node) Number of reduce tasks to deploy on each machine.

Rack awareness

In a multi-rack configuration, it is important to ensure that replicas of blocks are placed on multiple racks to minimize the possibility of data loss. Thus, a rack-aware placement policy should be used. The guidelines there suggest how to set up a basic rack awareness policy; due to the heterogeneity of network topologies, a definitive general-purpose solution cannot be provided here.

This tutorial targets Hadoop version 0.18.0. While most of the interfaces described will work on other, older versions of Hadoop, rack-awareness underwent a major overhaul in version 0.17. Thus, the following does not apply to version 0.16 and before.

One major consequence of the upgrade is that while rack-aware block replica placement has existed in Hadoop for some time, rack-aware task placement has only been added in version 0.17. If Hadoop MapReduce cannot place a task on the same node as the block of data which the task is scheduled to process, then it picks an arbitrary different node on which to schedule the task. Starting with 0.17.0, tasks will be placed (when possible) on the same rack as at least one replica of an input data block for a job, which should further minimize the amount of inter-rack data transfers required to perform a job.

Hadoop includes an interface called DNSToSwitchMapping which allows arbitrary Java code to be used to map servers onto a rack topology. The configuration key topology.node.switch.mapping.impl can be used to specify a class which meets this interface. More straightforward than writing a Java class for this purpose, however, is to use the default mapper, which executes a user-specified script (or other command) on each node of the cluster, which returns the rack id for that node. These rack ids are then aggregated and sent back to the NameNode.

Note that the rack mapping script used by this system is incompatible with the 0.16 method of usingdfs.network.script. Whereas dfs.network.script runs on each DataNode, a new script specified bytopology.script.file.name is run by the master node only. To set the rack mapping script, specify the key topology.script.file.name in conf/hadoop-site.xml.

Cluster contention

If you are configuring a large number of machines, it is likely that you have a large number of users who wish to submit jobs to execute on it. Hadoop’s job scheduling algorithm is based on a simple FIFO scheduler. Using this in a large deployment without external controls or policies agreed upon by all users can lead to lots of contention for the JobTracker, causing short jobs to be delayed by other long-running tasks and frustrating users.

An advanced technique to combat this problem is to configure a single HDFS cluster which spans all available machines, and configure several separate MapReduce clusters with their own JobTrackers and pools of TaskTrackers. All MapReduce clusters are configured to use the same DFS and the same NameNode; but separate groups of machines have a different machine acting as JobTracker (i.e., subclusters have different settings for mapred.job.tracker). Breaking machines up into several smaller clusters, each of which contains 20-40 TaskTrackers, provides users with lower contention for the system. Users may be assigned to different clusters by policy, or they can use the JobTracker status web pages (a web page exposed on port 50030 of each JobTracker) to determine which is underutilized.

Multiple strategies exist for this assignment process. It is considered best practice to stripe the TaskTrackers associated with each JobTracker across all racks. This maximizes the availability of each cluster (as they are all resistant to individual rack failure), and works with the HDFS replica placement policy to ensure that each MapReduce cluster can find rack-local replicas of all files used in any MapReduce jobs.

Performance Monitoring

Multiple tools exist to monitor large clusters for performance and troubleshooting. This section briefly highlights two such tools.

GANGLIA

Ganglia is a performance monitoring framework for distributed systems. Ganglia provides a distributed service which collects metrics on individual machines and forwards them to an aggregator which can report back to an administrator on the global state of a cluster.

Ganglia is designed to be integrated into other applications to collect statistics about their operation. Hadoop includes a performance monitoring framework which can use Ganglia as its backend. Instructions are available on the Hadoop wiki as to how to enable Ganglia metrics in Hadoop. Instructions are also included below.

After installing and configuring Ganglia on your cluster, to direct Hadoop to output its metric reports to Ganglia, create a file named hadoop-metrics.properties in the $HADOOP_HOME/conf directory. The file should have the following contents:

dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
dfs.period=10
dfs.servers=localhost:8649

mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
mapred.period=10
mapred.servers=localhost:8649

This assumes that gmond is running on each machine in the cluster. Instructions on the Hadoop wiki note that (in the experience of the wiki article author) this may result in all nodes reporting their results as “localhost” instead of with their individual hostnames. If this problem affects your cluster, an alternate configuration is proposed, in which all Hadoop instances speak directly with gmetad:

dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
dfs.period=10
dfs.servers=@GMETAD@:8650

mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
mapred.period=10
mapred.servers=@GMETAD@:8650

Where @GMETAD@ is the hostname of the server on which the gmetad service is running. If deploying Ganglia and Hadoop on a very large number of machines, the impact of this configuration (vs. the standard Ganglia configuration where individual services talk to gmond on localhost) should be evaluated.

NAGIOS

While Ganglia will monitor Hadoop-specific metrics, general information about the health of the cluster should be monitored with an additional tool.

Nagios is a machine and service monitoring system designed for large clusters. Nagios will provide useful diagnostic information for tuning your cluster, including network, disk, and CPU utilization across machines.

Additional Tips

The following are a few additional pieces of small advice:

  • Create a separate user named “hadoop” to run your instances; this will separate the Hadoop processes from any users on the system. Do not run Hadoop as root.
  • If Hadoop is installed in /home/hadoop/hadoop-0.18.0, link /home/hadoop/hadoop to/home/hadoop/hadoop-0.18.0. When upgrading to a newer version in the future, the link can be moved to make this process easier on other scripts that depend on the hadoop/bin directory.

References & Resources

Hadoop Getting Started – Single-node configuration instructions
Hadoop Cluster Setup – Official Hadoop configuration instructions
Michael Noll’s Hadoop configuration tutorials for single and multiple node configurations.

The Hadoop Distributed File System

  1. Introduction
  2. Goals for this Module
  3. Outline
  4. Distributed File System Basics
  5. Configuring HDFS
  6. Interacting With HDFS
    1. Common Example Operations
    2. HDFS Command Reference
    3. DFSAdmin Command Reference
  7. Using HDFS in MapReduce
  8. Using HDFS Programmatically
  9. HDFS Permissions and Security
  10. Additional HDFS Tasks
    1. Rebalancing Blocks
    2. Copying Large Sets of Files
    3. Decommissioning Nodes
    4. Verifying File System Health
    5. Rack Awareness
  11. HDFS Web Interface
  12. References

Distributed File System Basics

A distributed file system is designed to hold a large amount of data and provide access to this data to many clients distributed across a network. There are a number of distributed file systems that solve this problem in different ways.

NFS, the Network File System, is the most ubiquitous distributed file system. It is one of the oldest still in use. While its design is straightforward, it is also very constrained. NFS provides remote access to a single logical volume stored on a single machine. An NFS server makes a portion of its local file system visible to external clients. The clients can then mount this remote file system directly into their own Linux file system, and interact with it as though it were part of the local drive.

One of the primary advantages of this model is its transparency. Clients do not need to be particularly aware that they are working on files stored remotely. The existing standard library methods like open(), close(),fread(), etc. will work on files hosted over NFS.

But as a distributed file system, it is limited in its power. The files in an NFS volume all reside on a single machine. This means that it will only store as much information as can be stored in one machine, and does not provide any reliability guarantees if that machine goes down (e.g., by replicating the files to other servers). Finally, as all the data is stored on a single machine, all the clients must go to this machine to retrieve their data. This can overload the server if a large number of clients must be handled. Clients must also always copy the data to their local machines before they can operate on it.

HDFS is designed to be robust to a number of the problems that other DFS’s such as NFS are vulnerable to. In particular:

  • HDFS is designed to store a very large amount of information (terabytes or petabytes). This requires spreading the data across a large number of machines. It also supports much larger file sizes than NFS.
  • HDFS should store data reliably. If individual machines in the cluster malfunction, data should still be available.
  • HDFS should provide fast, scalable access to this information. It should be possible to serve a larger number of clients by simply adding more machines to the cluster.
  • HDFS should integrate well with Hadoop MapReduce, allowing data to be read and computed upon locally when possible.

But while HDFS is very scalable, its high performance design also restricts it to a particular class of applications; it is not as general-purpose as NFS. There are a large number of additional decisions and trade-offs that were made with HDFS. In particular:

  • Applications that use HDFS are assumed to perform long sequential streaming reads from files. HDFS is optimized to provide streaming read performance; this comes at the expense of random seek times to arbitrary positions in files.
  • Data will be written to the HDFS once and then read several times; updates to files after they have already been closed are not supported. (An extension to Hadoop will provide support for appending new data to the ends of files; it is scheduled to be included in Hadoop 0.19 but is not available yet.)
  • Due to the large size of files, and the sequential nature of reads, the system does not provide a mechanism for local caching of data. The overhead of caching is great enough that data should simply be re-read from HDFS source.
  • Individual machines are assumed to fail on a frequent basis, both permanently and intermittently. The cluster must be able to withstand the complete failure of several machines, possibly many happening at the same time (e.g., if a rack fails all together). While performance may degrade proportional to the number of machines lost, the system as a whole should not become overly slow, nor should information be lost. Data replication strategies combat this problem.

The design of HDFS is based on the design of GFS, the Google File System. Its design was described in a paperpublished by Google.

HDFS is a block-structured file system: individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity. Individual machines in the cluster are referred to as DataNodes. A file can be made of several blocks, and they are not necessarily stored on the same machine; the target machines which hold each block are chosen randomly on a block-by-block basis. Thus access to a file may require the cooperation of multiple machines, but supports file sizes far larger than a single-machine DFS; individual files can require more space than a single hard drive could hold.

If several machines must be involved in the serving of a file, then a file could be rendered unavailable by the loss of any one of those machines. HDFS combats this problem by replicating each block across a number of machines (3, by default).

nodes-and-blocks
Figure 2.1: DataNodes holding blocks of multiple files with a replication factor of 2. The NameNode maps the filenames onto the block ids.

Most block-structured file systems use a block size on the order of 4 or 8 KB. By contrast, the default block size in HDFS is 64MB — orders of magnitude larger. This allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk. The consequence of this decision is that HDFS expects to have very large files, and expects them to be read sequentially. Unlike a file system such as NTFS or EXT, which see many very small files, HDFS expects to store a modest number of very large files: hundreds of megabytes, or gigabytes each. After all, a 100 MB file is not even two full blocks. Files on your computer may also frequently be accessed “randomly,” with applications cherry-picking small amounts of information from several different locations in a file which are not sequentially laid out. By contrast, HDFS expects to read a block start-to-finish for a program. That having been said, attempting to use HDFS as a general-purpose distributed file system for a diverse set of applications will be suboptimal.

Because HDFS stores files as a set of large blocks across several machines, these files are not part of the ordinary file system. Typing ls on a machine running a DataNode daemon will display the contents of the ordinary Linux file system being used to host the Hadoop services — but it will not include any of the files stored inside the HDFS. This is because HDFS runs in a separate namespace, isolated from the contents of your local files. The files inside HDFS (or more accurately: the blocks that make them up) are stored in a particular directory managed by the DataNode service, but the files will named only with block ids. You cannot interact with HDFS-stored files using ordinary Linux file modification tools (e.g., ls, cp, mv, etc). However, HDFS does come with its own utilities for file management, which act very similar to these familiar tools. A later section in this tutorial will introduce you to these commands and their operation.

It is important for this file system to store its metadata reliably. Furthermore, while the file data is accessed in a write once and read many model, the metadata structures (e.g., the names of files and directories) can be modified by a large number of clients concurrently. It is important that this information is never desynchronized. Therefore, it is all handled by a single machine, called the NameNode. The NameNode stores all the metadata for the file system. Because of the relatively low amount of metadata per file (it only tracks file names, permissions, and the locations of each block of each file), all of this information can be stored in the main memory of the NameNode machine, allowing fast access to the metadata.

To open a file, a client contacts the NameNode and retrieves a list of locations for the blocks that comprise the file. These locations identify the DataNodes which hold each block. Clients then read file data directly from the DataNode servers, possibly in parallel. The NameNode is not directly involved in this bulk data transfer, keeping its overhead to a minimum.

Of course, NameNode information must be preserved even if the NameNode machine fails; there are multiple redundant systems that allow the NameNode to preserve the file system’s metadata even if the NameNode itself crashes irrecoverably. NameNode failure is more severe for the cluster than DataNode failure. While individual DataNodes may crash and the entire cluster will continue to operate, the loss of the NameNode will render the cluster inaccessible until it is manually restored. Fortunately, as the NameNode’s involvement is relatively minimal, the odds of it failing are considerably lower than the odds of an arbitrary DataNode failing at any given point in time.

A more thorough overview of the architectural decisions involved in the design and implementation of HDFS is given in the official Hadoop HDFS documentation. Before continuing in this tutorial, it is advisable that you read and understand the information presented there.

Configuring HDFS

The HDFS for your cluster can be configured in a very short amount of time. First we will fill out the relevant sections of the Hadoop configuration file, then format the NameNode.

CLUSTER CONFIGURATION

These instructions for cluster configuration assume that you have already downloaded and unzipped a copy of Hadoop.

The HDFS configuration is located in a set of XML files in the Hadoop configuration directory; conf/ under the main Hadoop install directory (where you unzipped Hadoop to). The conf/hadoop-defaults.xml file contains default values for every parameter in Hadoop. This file is considered read-only. You override this configuration by setting new values in conf/hadoop-site.xml. This file should be replicated consistently across all machines in the cluster. (It is also possible, though not advisable, to host it on NFS.)

Configuration settings are a set of key-value pairs of the format:

  <property>
    <name>property-name</name>
    <value>property-value</value>

  </property>

Adding the line <final>true</final> inside the property body will prevent properties from being overridden by user applications. This is useful for most system-wide configuration options.

The following settings are necessary to configure HDFS:

key value example
fs.default.name protocol://servername:port hdfs://alpha.milkman.org:9000
dfs.data.dir pathname /home/username/hdfs/data
dfs.name.dir pathname /home/username/hdfs/name

These settings are described individually below:

fs.default.name – This is the URI (protocol specifier, hostname, and port) that describes the NameNode for the cluster. Each node in the system on which Hadoop is expected to operate needs to know the address of the NameNode. The DataNode instances will register with this NameNode, and make their data available through it. Individual client programs will connect to this address to retrieve the locations of actual file blocks.

dfs.data.dir – This is the path on the local file system in which the DataNode instance should store its data. It is not necessary that all DataNode instances store their data under the same local path prefix, as they will all be on separate machines; it is acceptable that these machines are heterogeneous. However, it will simplify configuration if this directory is standardized throughout the system. By default, Hadoop will place this under /tmp. This is fine for testing purposes, but is an easy way to lose actual data in a production system, and thus must be overridden.

dfs.name.dir – This is the path on the local file system of the NameNode instance where the NameNode metadata is stored. It is only used by the NameNode instance to find its information, and does not exist on the DataNodes. The caveat above about /tmp applies to this as well; this setting must be overridden in a production system.

Another configuration parameter, not listed above, is dfs.replication. This is the default replication factor for each block of data in the file system. For a production cluster, this should usually be left at its default value of 3. (You are free to increase your replication factor, though this may be unnecessary and use more space than is required. Fewer than three replicas impact the high availability of information, and possibly the reliability of its storage.)

The following information can be pasted into the hadoop-site.xml file for a single-node configuration:

<configuration>
  <property>
    <name>fs.default.name</name>

    <value>hdfs://your.server.name.com:9000</value>
  </property>
  <property>
    <name>dfs.data.dir</name>

    <value>/home/username/hdfs/data</value>
  </property>
  <property>
    <name>dfs.name.dir</name>

    <value>/home/username/hdfs/name</value>
  </property>
</configuration>

Of course, your.server.name.com needs to be changed, as does username. Using port 9000 for the NameNode is arbitrary.

After copying this information into your conf/hadoop-site.xml file, copy this to the conf/ directories on all machines in the cluster.

The master node needs to know the addresses of all the machines to use as DataNodes; the startup scripts depend on this. Also in the conf/ directory, edit the file slaves so that it contains a list of fully-qualified hostnames for the slave instances, one host per line. On a multi-node setup, the master node (e.g.,localhost) is not usually present in this file.

Then make the directories necessary:

  user@EachMachine$ mkdir -p $HOME/hdfs/data

  user@namenode$ mkdir -p $HOME/hdfs/name

The user who owns the Hadoop instances will need to have read and write access to each of these directories. It is not necessary for all users to have access to these directories. Set permissions with chmod as appropriate. In a large-scale environment, it is recommended that you create a user named “hadoop” on each node for the express purpose of owning and running Hadoop tasks. For a single individual’s machine, it is perfectly acceptable to run Hadoop under your own username. It is not recommended that you run Hadoop as root.

STARTING HDFS

Now we must format the file system that we just configured:

  user@namenode:hadoop$ bin/hadoop namenode -format

This process should only be performed once. When it is complete, we are free to start the distributed file system:

  user@namenode:hadoop$ bin/start-dfs.sh

This command will start the NameNode server on the master machine (which is where the start-dfs.shscript was invoked). It will also start the DataNode instances on each of the slave machines. In a single-machine “cluster,” this is the same machine as the NameNode instance. On a real cluster of two or more machines, this script will ssh into each slave machine and start a DataNode instance.

Interacting With HDFS

This section will familiarize you with the commands necessary to interact with HDFS, loading and retrieving data, as well as manipulating files. This section makes extensive use of the command-line.

The bulk of commands that communicate with the cluster are performed by a monolithic script namedbin/hadoop. This will load the Hadoop system with the Java virtual machine and execute a user command. The commands are specified in the following form:

  user@machine:hadoop$ bin/hadoop moduleName -cmd args...

The moduleName tells the program which subset of Hadoop functionality to use. -cmd is the name of a specific command within this module to execute. Its arguments follow the command name.

Two such modules are relevant to HDFS: dfs and dfsadmin. Their use is described in the sections below.

COMMON EXAMPLE OPERATIONS

The dfs module, also known as “FsShell,” provides basic file manipulation operations. Their usage is introduced here.

A cluster is only useful if it contains data of interest. Therefore, the first operation to perform is loading information into the cluster. For purposes of this example, we will assume an example user named “someone” — but substitute your own username where it makes sense. Also note that any operation on files in HDFS can be performed from any node with access to the cluster, whose conf/hadoop-site.xml is configured to set fs.default.name to your cluster’s NameNode. We will call the fictional machine on which we are operating anynode. Commands are being run from the “hadoop” directory where you installed Hadoop. This may be /home/someone/src/hadoop on your machine, or /home/foo/hadoop on someone else’s. These initial commands are centered around loading information into HDFS, checking that it’s there, and getting information back out of HDFS.

Listing files

If we attempt to inspect HDFS, we will not find anything interesting there:

  someone@anynode:hadoop$ bin/hadoop dfs -ls
  someone@anynode:hadoop$

The “-ls” command returns silently. Without any arguments, -ls will attempt to show the contents of your “home” directory inside HDFS. Don’t forget, this is not the same as /home/$USER (e.g., /home/someone) on the host machine (HDFS keeps a separate namespace from the local files). There is no concept of a “current working directory” or cd command in HDFS.

If you provide -ls with an argument, you may see some initial directory contents:

  someone@anynode:hadoop$ bin/hadoop dfs -ls /
  Found 2 items
  drwxr-xr-x   - hadoop supergroup          0 2008-09-20 19:40 /hadoop
  drwxr-xr-x   - hadoop supergroup          0 2008-09-20 20:08 /tmp

These entries are created by the system. This example output assumes that “hadoop” is the username under which the Hadoop daemons (NameNode, DataNode, etc) were started. “supergroup” is a special group whose membership includes the username under which the HDFS instances were started (e.g., “hadoop”). These directories exist to allow the Hadoop MapReduce system to move necessary data to the different job nodes.

So we need to create our home directory, and then populate it with some files.

Inserting data into the cluster

Whereas a typical UNIX or Linux system stores individual users’ files in /home/$USER, the Hadoop DFS stores these in /user/$USER. For some commands like ls, if a directory name is required and is left blank, this is the default directory name assumed. (Other commands require explicit source and destination paths.) Any relative paths used as arguments to HDFS, Hadoop MapReduce, or other components of the system are assumed to be relative to this base directory.

Step 1: Create your home directory if it does not already exist.

  someone@anynode:hadoop$ bin/hadoop dfs -mkdir /user

If there is no /user directory, create that first. It will be automatically created later if necessary, but for instructive purposes, it makes sense to create it manually ourselves this time.

Then we are free to add our own home directory:

  someone@anynode:hadoop$ bin/hadoop dfs -mkdir /user/someone

Of course, replace /user/someone with /user/yourUserName.

Step 2: Upload a file. To insert a single file into HDFS, we can use the put command like so:

  someone@anynode:hadoop$ bin/hadoop dfs -put /home/someone/interestingFile.txt /user/yourUserName/

This copies /home/someone/interestingFile.txt from the local file system into/user/yourUserName/interestingFile.txt on HDFS.

Step 3: Verify the file is in HDFS. We can verify that the operation worked with either of the two following (equivalent) commands:

  someone@anynode:hadoop$ bin/hadoop dfs -ls /user/yourUserName
  someone@anynode:hadoop$ bin/hadoop dfs -ls

You should see a listing that starts with Found 1 items and then includes information about the file you inserted.

The following table demonstrates example uses of the put command, and their effects:

Command: Assuming: Outcome:
bin/hadoop dfs -put foo bar No file/directory named/user/$USER/bar exists in HDFS Uploads local file foo to a file named/user/$USER/bar
bin/hadoop dfs -put foo bar /user/$USER/bar is a directory Uploads local file foo to a file named/user/$USER/bar/foo
bin/hadoop dfs -put foo somedir/somefile /user/$USER/somedirdoes not exist in HDFS Uploads local file foo to a file named/user/$USER/somedir/somefile, creating the missing directory
bin/hadoop dfs -put foo bar /user/$USER/bar is already a file in HDFS No change in HDFS, and an error is returned to the user.

When the put command operates on a file, it is all-or-nothing. Uploading a file into HDFS first copies the data onto the DataNodes. When they all acknowledge that they have received all the data and the file handle is closed, it is then made visible to the rest of the system. Thus based on the return value of the put command, you can be confident that a file has either been successfully uploaded, or has “fully failed;” you will never get into a state where a file is partially uploaded and the partial contents are visible externally, but the upload disconnected and did not complete the entire file contents. In a case like this, it will be as though no upload took place.

Step 4: Uploading multiple files at once. The put command is more powerful than moving a single file at a time. It can also be used to upload entire directory trees into HDFS.

Create a local directory and put some files into it using the cp command. Our example user may have a situation like the following:

  someone@anynode:hadoop$ ls -R myfiles
  myfiles:
  file1.txt  file2.txt  subdir/

  myfiles/subdir:
  anotherFile.txt
  someone@anynode:hadoop$

This entire myfiles/ directory can be copied into HDFS like so:

  someone@anynode:hadoop$ bin/hadoop -put myfiles /user/myUsername
  someone@anynode:hadoop$ bin/hadoop -ls
  Found 1 items
  /user/someone/myfiles   <dir>    2008-06-12 20:59    rwxr-xr-x    someone    supergroup
  user@anynode:hadoop bin/hadoop -ls myfiles
  Found 3 items
  /user/someone/myfiles/file1.txt   <r 1>   186731  2008-06-12 20:59        rw-r--r--       someone   supergroup
  /user/someone/myfiles/file2.txt   <r 1>   168     2008-06-12 20:59        rw-r--r--       someone   supergroup
  /user/someone/myfiles/subdir      <dir>           2008-06-12 20:59        rwxr-xr-x       someone   supergroup

Thus demonstrating that the tree was correctly uploaded recursively. You’ll note that in addition to the file path, ls also reports the number of replicas of each file that exist (the “1” in <r 1>), the file size, upload time, permissions, and owner information.

Another synonym for -put is -copyFromLocal. The syntax and functionality are identical.

Retrieving data from HDFS

There are multiple ways to retrieve files from the distributed file system. One of the easiest is to use cat to display the contents of a file on stdout. (It can, of course, also be used to pipe the data into other applications or destinations.)

Step 1: Display data with cat.

If you have not already done so, upload some files into HDFS. In this example, we assume that a file named “foo” has been loaded into your home directory on HDFS.

  someone@anynode:hadoop$ bin/hadoop dfs -cat foo
  (contents of foo are displayed here)
  someone@anynode:hadoop$

Step 2: Copy a file from HDFS to the local file system.

The get command is the inverse operation of put; it will copy a file or directory (recursively) from HDFS into the target of your choosing on the local file system. A synonymous operation is called -copyToLocal.

  someone@anynode:hadoop$ bin/hadoop dfs -get foo localFoo
  someone@anynode:hadoop$ ls
  localFoo
  someone@anynode:hadoop$ cat localFoo
  (contents of foo are displayed here)

Like the put command, get will operate on directories in addition to individual files.

Shutting Down HDFS

If you want to shut down the HDFS functionality of your cluster (either because you do not want Hadoop occupying memory resources when it is not in use, or because you want to restart the cluster for upgrading, configuration changes, etc.), then this can be accomplished by logging in to the NameNode machine and running:

  someone@namenode:hadoop$ bin/stop-dfs.sh

This command must be performed by the same user who started HDFS with bin/start-dfs.sh.

HDFS COMMAND REFERENCE

There are many more commands in bin/hadoop dfs than were demonstrated here, although these basic operations will get you started. Running bin/hadoop dfs with no additional arguments will list all commands which can be run with the FsShell system. Furthermore, bin/hadoop dfs -helpcommandName will display a short usage summary for the operation in question, if you are stuck.

A table of all operations is reproduced below. The following conventions are used for parameters:

  • italics denote variables to be filled out by the user.
  • “path” means any file or directory name.
  • “path…” means one or more file or directory names.
  • “file” means any filename.
  • “src” and “dest” are path names in a directed operation.
  • “localSrc” and “localDest” are paths as above, but on the local file system. All other file and path names refer to objects inside HDFS.
  • Parameters in [brackets] are optional.
Command Operation
-ls path Lists the contents of the directory specified by path, showing the names, permissions, owner, size and modification date for each entry.
-lsr path Behaves like -ls, but recursively displays entries in all subdirectories of path.
-du path Shows disk usage, in bytes, for all files which match path; filenames are reported with the full HDFS protocol prefix.
-dus path Like -du, but prints a summary of disk usage of all files/directories in the path.
-mv src dest Moves the file or directory indicated by src to dest, within HDFS.
-cp src dest Copies the file or directory identified by src to dest, within HDFS.
-rm path Removes the file or empty directory identified by path.
-rmr path Removes the file or directory identified by path. Recursively deletes any child entries (i.e., files or subdirectories of path).
-put localSrcdest Copies the file or directory from the local file system identified by localSrc to dest within the DFS.
-copyFromLocallocalSrc dest Identical to -put
-moveFromLocallocalSrc dest Copies the file or directory from the local file system identified by localSrc to dest within HDFS, then deletes the local copy on success.
-get [-crc] srclocalDest Copies the file or directory in HDFS identified by src to the local file system path identified by localDest.
-getmerge srclocalDest[addnl] Retrieves all files that match the path src in HDFS, and copies them to a single, merged file in the local file system identified by localDest.
-cat filename Displays the contents of filename on stdout.
-copyToLocal [-crc] srclocalDest Identical to -get
-moveToLocal [-crc] srclocalDest Works like -get, but deletes the HDFS copy on success.
-mkdir path Creates a directory named path in HDFS. Creates any parent directories in path that are missing (e.g., like mkdir -p in Linux).
-setrep [-R] [-w]rep path Sets the target replication factor for files identified by path to rep. (The actual replication factor will move toward the target over time)
-touchz path Creates a file at path containing the current time as a timestamp. Fails if a file already exists at path, unless the file is already size 0.
-test -[ezd] path Returns 1 if path exists; has zero length; or is a directory, or 0 otherwise.
-stat [format]path Prints information about path. format is a string which accepts file size in blocks (%b), filename (%n), block size (%o), replication (%r), and modification date (%y, %Y).
-tail [-f] file Shows the lats 1KB of file on stdout.
-chmod [-R]mode,mode,…path… Changes the file permissions associated with one or more objects identified by path…. Performs changes recursively with -R. mode is a 3-digit octal mode, or {augo}+/-{rwxX}. Assumes a if no scope is specified and does not apply a umask.
-chown [-R] [owner][:[group]] path… Sets the owning user and/or group for files or directories identified by path…. Sets owner recursively if -R is specified.
-chgrp [-R]group path… Sets the owning group for files or directories identified by path…. Sets group recursively if-R is specified.
-help cmd Returns usage information for one of the commands listed above. You must omit the leading ‘-‘ character in cmd

DFSADMIN COMMAND REFERENCE

While the dfs module for bin/hadoop provides common file and directory manipulation commands, they all work with objects within the file system. The dfsadmin module manipulates or queries the file system as a whole. The operation of the commands in this module is described in this section.

Getting overall status: A brief status report for HDFS can be retrieved with bin/hadoop dfsadmin -report. This returns basic information about the overall health of the HDFS cluster, as well as some per-server metrics.

More involved status: If you need to know more details about what the state of the NameNode’s metadata is, the command bin/hadoop dfsadmin -metasave filename will record this information infilename. The metasave command will enumerate lists of blocks which are under-replicated, in the process of being replicated, and scheduled for deletion. NB: The help for this command states that it “saves NameNode’s primary data structures,” but this is a misnomer; the NameNode’s state cannot be restored from this information. However, it will provide good information about how the NameNode is managing HDFS’s blocks.

Safemode: Safemode is an HDFS state in which the file system is mounted read-only; no replication is performed, nor can files be created or deleted. This is automatically entered as the NameNode starts, to allow all DataNodes time to check in with the NameNode and announce which blocks they hold, before the NameNode determines which blocks are under-replicated, etc. The NameNode waits until a specific percentage of the blocks are present and accounted-for; this is controlled in the configuration by thedfs.safemode.threshold.pct parameter. After this threshold is met, safemode is automatically exited, and HDFS allows normal operations. The bin/hadoop dfsadmin -safemode what command allows the user to manipulate safemode based on the value of what, described below:

  • enter – Enters safemode
  • leave – Forces the NameNode to exit safemode
  • get – Returns a string indicating whether safemode is ON or OFF
  • wait – Waits until safemode has exited and returns

Changing HDFS membership – When decommissioning nodes, it is important to disconnect nodes from HDFS gradually to ensure that data is not lost.

Upgrading HDFS versions – When upgrading from one version of Hadoop to the next, the file formats used by the NameNode and DataNodes may change. When you first start the new version of Hadoop on the cluster, you need to tell Hadoop to change the HDFS version (or else it will not mount), using the command:bin/start-dfs.sh -upgrade. It will then begin upgrading the HDFS version. The status of an ongoing upgrade operation can be queried with the bin/hadoop dfsadmin -upgradeProgress statuscommand. More verbose information can be retrieved with bin/hadoop dfsadmin -upgradeProgress details. If the upgrade is blocked and you would like to force it to continue, use the command:bin/hadoop dfsadmin -upgradeProgress force. (Note: be sure you know what you are doing if you use this last command.)

When HDFS is upgraded, Hadoop retains backup information allowing you to downgrade to the original HDFS version in case you need to revert Hadoop versions. To back out the changes, stop the cluster, re-install the older version of Hadoop, and then use the command: bin/start-dfs.sh -rollback. It will restore the previous HDFS state.

Only one such archival copy can be kept at a time. Thus, after a few days of operation with the new version (when it is deemed stable), the archival copy can be removed with the command bin/hadoop dfsadmin -finalizeUpgrade. The rollback command cannot be issued after this point. This must be performed before a second Hadoop upgrade is allowed.

Getting help – As with the dfs module, typing bin/hadoop dfsadmin -help cmd will provide more usage information about the particular command.

Using HDFS in MapReduce

The HDFS is a powerful companion to Hadoop MapReduce. By setting the fs.default.name configuration option to point to the NameNode (as was done above), Hadoop MapReduce jobs will automatically draw their input files from HDFS. Using the regular FileInputFormat subclasses, Hadoop will automatically draw its input data sources from file paths within HDFS, and will distribute the work over the cluster in an intelligent fashion to exploit block locality where possible.

Using HDFS Programmatically

While HDFS can be manipulated explicitly through user commands, or implicitly as the input to or output from a Hadoop MapReduce job, you can also work with HDFS inside your own Java applications. (A JNI-based wrapper, libhdfs also provides this functionality in C/C++ programs.)

This section provides a short tutorial on using the Java-based HDFS API. It will be based on the following code listing:

1:  import java.io.File;
2:  import java.io.IOException;
3:
4:  import org.apache.hadoop.conf.Configuration;
5:  import org.apache.hadoop.fs.FileSystem;
6:  import org.apache.hadoop.fs.FSDataInputStream;
7:  import org.apache.hadoop.fs.FSDataOutputStream;
8:  import org.apache.hadoop.fs.Path;
9:
10: public class HDFSHelloWorld {
11:
12:   public static final String theFilename = "hello.txt";
13:   public static final String message = "Hello, world!\n";
14:
15:   public static void main (String [] args) throws IOException {
16:
17:     Configuration conf = new Configuration();
18:     FileSystem fs = FileSystem.get(conf);
19:
20:     Path filenamePath = new Path(theFilename);
21:
22:     try {
23:       if (fs.exists(filenamePath)) {
24:         // remove the file first
25:         fs.delete(filenamePath);
26:       }
27:
28:       FSDataOutputStream out = fs.create(filenamePath);
29:       out.writeUTF(message;
30:       out.close();
31:
32:       FSDataInputStream in = fs.open(filenamePath);
33:       String messageIn = in.readUTF();
34:       System.out.print(messageIn);
35:       in.close();
46:     } catch (IOException ioe) {
47:       System.err.println("IOException during operation: " + ioe.toString());
48:       System.exit(1);
49:     }
40:   }
41: }

This program creates a file named hello.txt, writes a short message into it, then reads it back and prints it to the screen. If the file already existed, it is deleted first.

First we get a handle to an abstract FileSystem object, as specified by the application configuration. TheConfiguration object created uses the default parameters.

17:     Configuration conf = new Configuration();
18:     FileSystem fs = FileSystem.get(conf);

The FileSystem interface actually provides a generic abstraction suitable for use in several file systems. Depending on the Hadoop configuration, this may use HDFS or the local file system or a different one altogether. If this test program is launched via the ordinary ‘java classname‘ command line, it may not find conf/hadoop-site.xml and will use the local file system. To ensure that it uses the proper Hadoop configuration, launch this program through Hadoop by putting it in a jar and running:

$HADOOP_HOME/bin/hadoop jar yourjar HDFSHelloWorld

Regardless of how you launch the program and which file system it connects to, writing to a file is done in the same way:

28:       FSDataOutputStream out = fs.create(filenamePath);
29:       out.writeUTF(message);
30:       out.close();

First we create the file with the fs.create() call, which returns an FSDataOutputStream used to write data into the file. We then write the information using ordinary stream writing functions;FSDataOutputStream extends the java.io.DataOutputStream class. When we are done with the file, we close the stream with out.close().

This call to fs.create() will overwrite the file if it already exists, but for sake of example, this program explicitly removes the file first anyway (note that depending on this explicit prior removal is technically a race condition). Testing for whether a file exists and removing an existing file are performed by lines 23-26:

23:       if (fs.exists(filenamePath)) {
24:         // remove the file first
25:         fs.delete(filenamePath);
26:       }

Other operations such as copying, moving, and renaming are equally straightforward operations on Pathobjects performed by the FileSystem.

Finally, we re-open the file for read, and pull the bytes from the file, converting them to a UTF-8 encoded string in the process, and print to the screen:

32:       FSDataInputStream in = fs.open(filenamePath);
33:       String messageIn = in.readUTF();
34:       System.out.print(messageIn);
35:       in.close();

The fs.open() method returns an FSDataInputStream, which subclassesjava.io.DataInputStream. Data can be read from the stream using the readUTF() operation, as on line 33. When we are done with the stream, we call close() to free the handle associated with the file.

More information:

Complete JavaDoc for the HDFS API is provided athttp://hadoop.apache.org/common/docs/r0.20.2/api/index.html.

A direct link to the FileSystem interface is:http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/fs/FileSystem.html.

Another example HDFS application is available on the Hadoop wiki. This implements a file copy operation.

HDFS Permissions and Security

Starting with Hadoop 0.16.1, HDFS has included a rudimentary file permissions system. This permission system is based on the POSIX model, but does not provide strong security for HDFS files. The HDFS permissions system is designed to prevent accidental corruption of data or casual misuse of information within a group of users who share access to a cluster. It is not a strong security model that guarantees denial of access to unauthorized parties.

HDFS security is based on the POSIX model of users and groups. Each file or directory has 3 permissions (read, write and execute) associated with it at three different granularities: the file’s owner, users in the same group as the owner, and all other users in the system. As the HDFS does not provide the full POSIX spectrum of activity, some combinations of bits will be meaningless. For example, no file can be executed; the +x bits cannot be set on files (only directories). Nor can an existing file be written to, although the +w bits may still be set.

Security permissions and ownership can be modified using the bin/hadoop dfs -chmod, -chown, and -chgrp operations described earlier in this document; they work in a similar fashion to the POSIX/Linux tools of the same name.

Determining identity – Identity is not authenticated formally with HDFS; it is taken from an extrinsic source. The Hadoop system is programmed to use the user’s current login as their Hadoop username (i.e., the equivalent of whoami). The user’s current working group list (i.e, the output of groups) is used as the group list in Hadoop. HDFS itself does not verify that this username is genuine to the actual operator.

Superuser status – The username which was used to start the Hadoop process (i.e., the username who actually ran bin/start-all.sh or bin/start-dfs.sh) is acknowledged to be the superuser for HDFS. If this user interacts with HDFS, he does so with a special username superuser. This user’s operations on HDFS never fail, regardless of permission bits set on the particular files he manipulates. If Hadoop is shutdown and restarted under a different username, that username is then bound to the superuser account.

Supergroup – There is also a special group named supergroup, whose membership is controlled by the configuration parameter dfs.permissions.supergroup.

Disabling permissions – By default, permissions are enabled on HDFS. The permission system can be disabled by setting the configuration option dfs.permissions to false. The owner, group, and permissions bits associated with each file and directory will still be preserved, but the HDFS process does not enforce them, except when using permissions-related operations such as -chmod.

Additional HDFS Tasks

REBALANCING BLOCKS

New nodes can be added to a cluster in a straightforward manner. On the new node, the same Hadoop version and configuration (conf/hadoop-site.xml) as on the rest of the cluster should be installed. Starting the DataNode daemon on the machine will cause it to contact the NameNode and join the cluster. (The new node should be added to the slaves file on the master server as well, to inform the master how to invoke script-based commands on the new node.)

But the new DataNode will have no data on board initially; it is therefore not alleviating space concerns on the existing nodes. New files will be stored on the new DataNode in addition to the existing ones, but for optimum usage, storage should be evenly balanced across all nodes.

This can be achieved with the automatic balancer tool included with Hadoop. The Balancer class will intelligently balance blocks across the nodes to achieve an even distribution of blocks within a given threshold, expressed as a percentage. (The default is 10%.) Smaller percentages make nodes more evenly balanced, but may require more time to achieve this state. Perfect balancing (0%) is unlikely to actually be achieved.

The balancer script can be run by starting bin/start-balancer.sh in the Hadoop directory. The script can be provided a balancing threshold percentage with the -threshold parameter; e.g., bin/start-balancer.sh -threshold 5. The balancer will automatically terminate when it achieves its goal, or when an error occurs, or it cannot find more candidate blocks to move to achieve better balance. The balancer can always be terminated safely by the administrator by running bin/stop-balancer.sh.

The balancing script can be run either when nobody else is using the cluster (e.g., overnight), but can also be run in an “online” fashion while many other jobs are on-going. To prevent the rebalancing process from consuming large amounts of bandwidth and significantly degrading the performance of other processes on the cluster, the dfs.balance.bandwidthPerSec configuration parameter can be used to limit the number of bytes/sec each node may devote to rebalancing its data store.

COPYING LARGE SETS OF FILES

When migrating a large number of files from one location to another (either from one HDFS cluster to another, from S3 into HDFS or vice versa, etc), the task should be divided between multiple nodes to allow them all to share in the bandwidth required for the process. Hadoop includes a tool called distcp for this purpose.

By invoking bin/hadoop distcp src dest, Hadoop will start a MapReduce task to distribute the burden of copying a large number of files from src to dest. These two parameters may specify a full URL for the the path to copy. e.g., "hdfs://SomeNameNode:9000/foo/bar/" and"hdfs://OtherNameNode:2000/baz/quux/" will copy the children of /foo/bar on one cluster to the directory tree rooted at /baz/quux on the other. The paths are assumed to be directories, and are copied recursively. S3 URLs can be specified with s3://bucket-name/key.

DECOMMISSIONING NODES

In addition to allowing nodes to be added to the cluster on the fly, nodes can also be removed from a cluster while it is running, without data loss. But if nodes are simply shut down “hard,” data loss may occur as they may hold the sole copy of one or more file blocks.

Nodes must be retired on a schedule that allows HDFS to ensure that no blocks are entirely replicated within the to-be-retired set of DataNodes.

HDFS provides a decommissioning feature which ensures that this process is performed safely. To use it, follow the steps below:

Step 1: Cluster configuration. If it is assumed that nodes may be retired in your cluster, then before it is started, an excludes file must be configured. Add a key named dfs.hosts.exclude to yourconf/hadoop-site.xml file. The value associated with this key provides the full path to a file on the NameNode’s local file system which contains a list of machines which are not permitted to connect to HDFS.

Step 2: Determine hosts to decommission. Each machine to be decommissioned should be added to the file identified by dfs.hosts.exclude, one per line. This will prevent them from connecting to the NameNode.

Step 3: Force configuration reload. Run the command bin/hadoop dfsadmin -refreshNodes. This will force the NameNode to reread its configuration, including the newly-updated excludes file. It will decommission the nodes over a period of time, allowing time for each node’s blocks to be replicated onto machines which are scheduled to remain active.

Step 4: Shutdown nodes. After the decommission process has completed, the decommissioned hardware can be safely shutdown for maintenance, etc. The bin/hadoop dfsadmin -report command will describe which nodes are connected to the cluster.

Step 5: Edit excludes file again. Once the machines have been decommissioned, they can be removed from the excludes file. Running bin/hadoop dfsadmin -refreshNodes again will read the excludes file back into the NameNode, allowing the DataNodes to rejoin the cluster after maintenance has been completed, or additional capacity is needed in the cluster again, etc.

VERIFYING FILE SYSTEM HEALTH

After decommissioning nodes, restarting a cluster, or periodically during its lifetime, you may want to ensure that the file system is healthy–that files are not corrupted or under-replicated, and that blocks are not missing.

Hadoop provides an fsck command to do exactly this. It can be launched at the command line like so:

  bin/hadoop fsck [path] [options]

If run with no arguments, it will print usage information and exit. If run with the argument /, it will check the health of the entire file system and print a report. If provided with a path to a particular directory or file, it will only check files under that path. If an option argument is given but no path, it will start from the file system root (/). The options may include two different types of options:

Action options specify what action should be taken when corrupted files are found. This can be -move, which moves corrupt files to /lost+found, or -delete, which deletes corrupted files.

Information options specify how verbose the tool should be in its report. The -files option will list all files it checks as it encounters them. This information can be further expanded by adding the -blocks option, which prints the list of blocks for each file. Adding -locations to these two options will then print the addresses of the DataNodes holding these blocks. Still more information can be retrieved by adding -racksto the end of this list, which then prints the rack topology information for each location. (See the next subsection for more information on configuring network rack awareness.) Note that the later options do not imply the former; you must use them in conjunction with one another. Also, note that the Hadoop program uses -files in a “common argument parser” shared by the different commands such as dfsadmin, fsck,dfs, etc. This means that if you omit a path argument to fsck, it will not receive the -files option that you intend. You can separate common options from fsck-specific options by using -- as an argument, like so:

  bin/hadoop fsck -- -files -blocks

The -- is not required if you provide a path to start the check from, or if you specify another argument first such as -move.

By default, fsck will not operate on files still open for write by another client. A list of such files can be produced with the -openforwrite option.

Rack Awareness

For small clusters in which all servers are connected by a single switch, there are only two levels of locality: “on-machine” and “off-machine.” When loading data from a DataNode’s local drive into HDFS, the NameNode will schedule one copy to go into the local DataNode, and will pick two other machines at random from the cluster.

For larger Hadoop installations which span multiple racks, it is important to ensure that replicas of data exist on multiple racks. This way, the loss of a switch does not render portions of the data unavailable due to all replicas being underneath it.

HDFS can be made rack-aware by the use of a script which allows the master node to map the network topology of the cluster. While alternate configuration strategies can be used, the default implementation allows you to provide an executable script which returns the “rack address” of each of a list of IP addresses.

The network topology script receives as arguments one or more IP addresses of nodes in the cluster. It returns on stdout a list of rack names, one for each input. The input and output order must be consistent.

To set the rack mapping script, specify the key topology.script.file.name in conf/hadoop-site.xml. This provides a command to run to return a rack id; it must be an executable script or program. By default, Hadoop will attempt to send a set of IP addresses to the file as several separate command line arguments. You can control the maximum acceptable number of arguments with thetopology.script.number.args key.

Rack ids in Hadoop are hierarchical and look like path names. By default, every node has a rack id of/default-rack. You can set rack ids for nodes to any arbitrary path, e.g., /foo/bar-rack. Path elements further to the left are higher up the tree. Thus a reasonable structure for a large installation may be /top-switch-name/rack-name.

Hadoop rack ids are not currently expressive enough to handle an unusual routing topology such as a 3-d torus; they assume that each node is connected to a single switch which in turn has a single upstream switch. This is not usually a problem, however. Actual packet routing will be directed using the topology discovered by or set in switches and routers. The Hadoop rack ids will be used to find “near” and “far” nodes for replica placement (and in 0.17, MapReduce task placement).

The following example script performs rack identification based on IP addresses given a hierarchical IP addressing scheme enforced by the network administrator. This may work directly for simple installations; more complex network configurations may require a file- or table-based lookup process. Care should be taken in that case to keep the table up-to-date as nodes are physically relocated, etc. This script requires that the maximum number of arguments be set to 1.

#!/bin/bash
# Set rack id based on IP address.
# Assumes network administrator has complete control
# over IP addresses assigned to nodes and they are
# in the 10.x.y.z address space. Assumes that
# IP addresses are distributed hierarchically. e.g.,
# 10.1.y.z is one data center segment and 10.2.y.z is another;
# 10.1.1.z is one rack, 10.1.2.z is another rack in
# the same segment, etc.)
#
# This is invoked with an IP address as its only argument

# get IP address from the input
ipaddr=$0

# select "x.y" and convert it to "x/y"
segments=`echo $ipaddr | cut --delimiter=. --fields=2-3 --output-delimiter=/`
echo /${segments}

HDFS Web Interface

HDFS exposes a web server which is capable of performing basic status monitoring and file browsing operations. By default this is exposed on port 50070 on the NameNode. Accessing http://namenode:50070/ with a web browser will return a page containing overview information about the health, capacity, and usage of the cluster (similar to the information returned by bin/hadoop dfsadmin -report).

The address and port where the web interface listens can be changed by setting dfs.http.address inconf/hadoop-site.xml. It must be of the form address:port. To accept requests on all addresses, use0.0.0.0.

From this interface, you can browse HDFS itself with a basic file-browser interface. Each DataNode exposes its file browser interface on port 50075. You can override this by setting the dfs.datanode.http.addressconfiguration key to a setting other than 0.0.0.0:50075. Log files generated by the Hadoop daemons can be accessed through this interface, which is useful for distributed debugging and troubleshooting.

References

Ghemawat, S. Gobioff, H. and Leung, S.-T. The Google File System. Proceedings of the 19th ACM Symposium on Operating Systems Principles. pp 29–43. Bolton Landing, NY, USA. 2003. © 2003, ACM.
Borthakur, Dhruba. The Hadoop Distributed File System: Architecture and Design. © 2007, The Apache Software Foundation.
Hadoop DFS User Guide. © 2007, The Apache Software Foundation.
HDFS: Permissions User and Administrator Guide. © 2007, The Apache Software Foundation.
HDFS API Javadoc © 2008, The Apache Software Foundation.

The Hadoop Approach

Hadoop is designed to efficiently process large volumes of information by connecting many commodity computers together to work in parallel. The theoretical 1000-CPU machine described earlier would cost a very large amount of money, far more than 1,000 single-CPU or 250 quad-core machines. Hadoop will tie these smaller and more reasonably priced machines together into a single cost-effective compute cluster.

COMPARISON TO EXISTING TECHNIQUES

Performing computation on large volumes of data has been done before, usually in a distributed setting. What makes Hadoop unique is its simplified programming model which allows the user to quickly write and test distributed systems, and its efficient, automatic distribution of data and work across machines and in turn utilizing the underlying parallelism of the CPU cores.

Grid scheduling of computers can be done with existing systems such as Condor. But Condor does not automatically distribute data: a separate SAN must be managed in addition to the compute cluster. Furthermore, collaboration between multiple compute nodes must be managed with a communication system such as MPI. This programming model is challenging to work with and can lead to the introduction of subtle errors.

DATA DISTRIBUTION

In a Hadoop cluster, data is distributed to all the nodes of the cluster as it is being loaded in. The Hadoop Distributed File System (HDFS) will split large data files into chunks which are managed by different nodes in the cluster. In addition to this each chunk is replicated across several machines, so that a single machine failure does not result in any data being unavailable. An active monitoring system then re-replicates the data in response to system failures which can result in partial storage. Even though the file chunks are replicated and distributed across several machines, they form a single namespace, so their contents are universally accessible.

Data is conceptually record-oriented in the Hadoop programming framework. Individual input files are broken into lines or into other formats specific to the application logic. Each process running on a node in the cluster then processes a subset of these records. The Hadoop framework then schedules these processes in proximity to the location of data/records using knowledge from the distributed file system. Since files are spread across the distributed file system as chunks, each compute process running on a node operates on a subset of the data. Which data operated on by a node is chosen based on its locality to the node: most data is read from the local disk straight into the CPU, alleviating strain on network bandwidth and preventing unnecessary network transfers. This strategy of moving computation to the data, instead of moving the data to the computation allows Hadoop to achieve high data locality which in turn results in high performance.

load-into-dfs

Figure 1.1: Data is distributed across nodes at load time.

MAPREDUCE: ISOLATED PROCESSES

Hadoop limits the amount of communication which can be performed by the processes, as each individual record is processed by a task in isolation from one another. While this sounds like a major limitation at first, it makes the whole framework much more reliable. Hadoop will not run just any program and distribute it across a cluster. Programs must be written to conform to a particular programming model, named “MapReduce.”

In MapReduce, records are processed in isolation by tasks called Mappers. The output from the Mappers is then brought together into a second set of tasks called Reducers, where results from different mappers can be merged together.

mapreduce-process

Figure 1.2: Mapping and reducing tasks run on nodes where individual records of data are already present.

Separate nodes in a Hadoop cluster still communicate with one another. However, in contrast to more conventional distributed systems where application developers explicitly marshal byte streams from node to node over sockets or through MPI buffers, communication in Hadoop is performed implicitly. Pieces of data can be tagged with key names which inform Hadoop how to send related bits of information to a common destination node. Hadoop internally manages all of the data transfer and cluster topology issues.

By restricting the communication between nodes, Hadoop makes the distributed system much more reliable. Individual node failures can be worked around by restarting tasks on other machines. Since user-level tasks do not communicate explicitly with one another, no messages need to be exchanged by user programs, nor do nodes need to roll back to pre-arranged checkpoints to partially restart the computation. The other workers continue to operate as though nothing went wrong, leaving the challenging aspects of partially restarting the program to the underlying Hadoop layer.

FLAT SCALABILITY

One of the major benefits of using Hadoop in contrast to other distributed systems is its flat scalability curve. Executing Hadoop on a limited amount of data on a small number of nodes may not demonstrate particularly stellar performance as the overhead involved in starting Hadoop programs is relatively high. Other parallel/distributed programming paradigms such as MPI (Message Passing Interface) may perform much better on two, four, or perhaps a dozen machines. Though the effort of coordinating work among a small number of machines may be better-performed by such systems, the price paid in performance and engineering effort (when adding more hardware as a result of increasing data volumes) increases non-linearly.

A program written in distributed frameworks other than Hadoop may require large amounts of refactoring when scaling from ten to one hundred or one thousand machines. This may involve having the program be rewritten several times; fundamental elements of its design may also put an upper bound on the scale to which the application can grow.

Hadoop, however, is specifically designed to have a very flat scalability curve. After a Hadoop program is written and functioning on ten nodes, very little–if any–work is required for that same program to run on a much larger amount of hardware. Orders of magnitude of growth can be managed with little re-work required for your applications. The underlying Hadoop platform will manage the data and hardware resources and provide dependable performance growth proportionate to the number of machines available.