Tag Archives: hadoop

How-to: Analyze Twitter Data with Apache Hadoop “http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/”

Social media has gained immense popularity with marketing teams, and Twitter is an effective tool for a company to get people excited about its products. Twitter makes it easy to engage users and communicate directly with them, and in turn, users can provide word-of-mouth marketing for companies by discussing the products. Given limited resources, and knowing we may not be able to talk to everyone we want to target directly, marketing departments can be more efficient by being selective about whom we reach out to.

In this post, we’ll learn how we can use Apache FlumeApache HDFSApache Oozie, and Apache Hive to design an end-to-end data pipeline that will enable us to analyze Twitter data. This will be the first post in a series. The posts to follow to will describe, in more depth, how each component is involved and how the custom code operates. All the code and instructions necessary to reproduce this pipeline is available on the Cloudera Github.

Who is Influential?

To understand whom we should target, let’s take a step back and try to understand the mechanics of Twitter. A user – let’s call him Joe – follows a set of people, and has a set of followers. When Joe sends an update out, that update is seen by all of his followers. Joe can also retweet other users’ updates. A retweet is a repost of an update, much like you might forward an email. If Joe sees a tweet from Sue, and retweets it, all of Joe’s followers see Sue’s tweet, even if they don’t follow Sue. Through retweets, messages can get passed much further than just the followers of the person who sent the original tweet. Knowing that, we can try to engage users whose updates tend to generate lots of retweets. Since Twitter tracks retweet counts for all tweets, we can find the users we’re looking for by analyzing Twitter data.

Now we know the question we want to ask: Which Twitter users get the most retweets? Who is influential within our industry?

How Do We Answer These Questions?

SQL queries can be used to answer this question: We want to look at which users are responsible for the most retweets, in descending order of most retweeted. However, querying Twitter data in a traditional RDBMS is inconvenient, since the Twitter Streaming API outputs tweets in a JSON format which can be arbitrarily complex. In the Hadoop ecosystem, the Hive project provides a query interface which can be used to query data that resides in HDFS. The query language looks very similar to SQL, but allows us to easily model complex types, so we can easily query the type of data we have. Seems like a good place to start. So how do we get Twitter data into Hive? First, we need to get Twitter data into HDFS, and then we’ll be able to tell Hive where the data resides and how to read it.

The diagram above shows a high-level view of how some of the CDH (Cloudera’s Distribution Including Apache Hadoop) components can be pieced together to build the data pipeline we need to answer the questions we have. The rest of this post will describe how these components interact and the purposes they each serve.

Gathering Data with Apache Flume

The Twitter Streaming API will give us a constant stream of tweets coming from the service. One option would be to use a simple utility like curl to access the API and then periodically load the files. However, this would require us to write code to control where the data goes in HDFS, and if we have a secure cluster, we will have to integrate with security mechanisms. It will be much simpler to use components within CDH to automatically move the files from the API to HDFS, without our manual intervention.

Apache Flume is a data ingestion system that is configured by defining endpoints in a data flow called sources and sinks. In Flume, each individual piece of data (tweets, in our case) is called an event; sources produce events, and send the events through a channel, which connects the source to the sink. The sink then writes the events out to a predefined location. Flume supports some standard data sources, such as syslog or netcat. For this use case, we’ll need to design a custom source that accesses the Twitter Streaming API, and sends the tweets through a channel to a sink that writes to HDFS files. Additionally, we can use the custom source to filter the tweets on a set of search keywords to help identify relevant tweets, rather than a pure sample of the entire Twitter firehose. The custom Flume source code can be found here.

Partition Management with Oozie

Once we have the Twitter data loaded into HDFS, we can stage it for querying by creating an external table in Hive. Using an external table will allow us to query the table without moving the data from the location where it ends up in HDFS. To ensure scalability, as we add more and more data, we’ll need to also partition the table. A partitioned table allows us to prune the files that we read when querying, which results in better performance when dealing with large data sets. However, the Twitter API will continue to stream tweets and Flume will perpetually create new files. We can automate the periodic process of adding partitions to our table as the new data comes in.

Apache Oozie is a workflow coordination system that can be used to solve this problem. Oozie is an extremely flexible system for designing job workflows, which can be scheduled to run based on a set of criteria. We can configure the workflow to run an ALTER TABLE command that adds a partition containing the last hour’s worth of data into Hive, and we can instruct the workflow to occur every hour. This will ensure that we’re always looking at up-to-date data.

The configuration files for the Oozie workflow are located here.

Querying Complex Data with Hive

Before we can query the data, we need to ensure that the Hive table can properly interpret the JSON data. By default, Hive expects that input files use a delimited row format, but our Twitter data is in a JSON format, which will not work with the defaults. This is actually one of Hive’s biggest strengths. Hive allows us to flexibly define, and redefine, how the data is represented on disk. The schema is only really enforced when we read the data, and we can use the Hive SerDe interface to specify how to interpret what we’ve loaded.

SerDe stands for Serializer and Deserializer, which are interfaces that tell Hive how it should translate the data into something that Hive can process. In particular, the Deserializer interface is used when we read data off of disk, and converts the data into objects that Hive knows how to manipulate. We can write a custom SerDe that reads the JSON data in and translates the objects for Hive. Once that’s put into place, we can start querying. The JSON SerDe code can be found here. The SerDe will take a tweet in JSON form, like the following:

and translate the JSON entities into queryable columns:

which will result in:

We’ve now managed to put together an end-to-end system, which gathers data from the Twitter Streaming API, sends the tweets to files on HDFS through Flume, and uses Oozie to periodically load the files into Hive, where we can query the raw JSON data, through the use of a Hive SerDe.

Some Results

In my own testing, I let Flume collect data for about three days, filtering on a set of keywords:

hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

The collected data was about half a GB of JSON data, and here is an example of what a tweet looks like. The data has some structure, but certain fields may or may not exist. The retweeted_status field, for example, will only be present if the tweet was a retweet. Additionally, some of the fields may be arbitrarily complex. The hashtags field is an array of all the hashtags present in the tweets, but most RDBMS’s do not support arrays as a column type. This semi-structured quality of the data makes the data very difficult to query in a traditional RDBMS. Hive can handle this data much more gracefully.

The query below will find usernames, and the number of retweets they have generated across all the tweets that we have data for:

For the few days of data, I found that these were the most retweeted users for the industry:

From these results, we can see whose tweets are getting heard by the widest audience, and also determine whether these people are communicating on a regular basis or not. We can use this information to more carefully target our messaging in order to get them talking about our products, which, in turn, will get other people talking about our products.

Conclusion

In this post we’ve seen how we can take some of the components of CDH and combine them to create an end-to-end data management system. This same architecture could be used for a variety of applications designed to look at Twitter data, such as identifying spam accounts, or identifying clusters of keywords. Taking the system even further, the general architecture can be used across numerous applications. By plugging in different Flume sources and Hive SerDes, this application can be customized for many other applications, like analyzing web logs, to give an example. Grab the code, and give it a shot yourself.

Advertisements

CDH 5.3: Apache Sentry Integration with HDFS

Starting in CDH 5.3, Apache Sentry integration with HDFS saves admins a lot of work by centralizing access control permissions across components that utilize HDFS.

It’s been more than a year and a half since a couple of my colleagues here at Cloudera shipped the first version of Sentry (now Apache Sentry (incubating)). This project filled a huge security gap in the Apache Hadoop ecosystem by bringing truly secure and dependable fine grained authorization to the Hadoop ecosystem and provided out-of-the-box integration for Apache Hive. Since then the project has grown significantly–adding support for Impala and Search and the wonderful Hue App to name a few significant additions.

In order to provide a truly secure and centralized authorization mechanism, Sentry deployments have been historically set up so that all Hive’s data and metadata are accessible only by HiveServer2 and every other user is cut out. This has been a pain point for Sqoop users as Sqoop does not use the HiveServer2 interface. Hence users with a Sentry-secured Hive deployment were forced to split the import task into two steps: simple HDFS import followed by manually loading the data into Hive.

With the inclusion of HDFS ACLs and the integration of Sentry into the Hive metastore in CDH 5.1, users were able to improve this situation and get the direct Hive import working again. However, this approach required manual administrator intervention to configure HDFS ACLs according to the Sentry configuration and needed a manual refresh to keep both systems in sync.

One of the large features included in the recently released CDH 5.3 is Sentry integration with HDFS, which enables customers to easily share data between Hive, Impala and all the other Hadoop components that interact with HDFS (MapReduce, Spark, Pig, and Sqoop, and so on) while ensuring that user access permissions only need to be set once, and that they are uniformly enforced.

The rest of this post focuses on the example of using Sqoop together with this Sentry feature. Sqoop data can now be imported into Hive without any additional administrator intervention. By exposing Sentry policies—what tables from which a user can select and to what tables they can insert—directly in HDFS, Sqoop will re-use the same policies that have been configured via GRANT/REVOKE statements or the Hue Sentry App and will import data into Hive without any trouble.

Configuration

In order for Sqoop to seamlessly import into a Sentry Secured Hive instance, the Hadoop administrator needs to follow a few configuration steps to enable all the necessary features. First, your cluster needs to be using the Sentry Service as backend for storing authorization metadata and not rely on the older policy files.

If you are already using Sentry Service and GRANT/REVOKE statements, you can directly jump to step 3).

  1. Make sure that you have Sentry service running on your cluster. You should see it in the service list:

Ekran Resmi 2015-10-12 01.58.03

    2.  And that Hive is configured to use this service as a backend for Sentry metadata:

Ekran Resmi 2015-10-12 01.58.14

     3.  Finally enable HDFS Integration with Sentry:

Ekran Resmi 2015-10-12 01.58.27

Example Sqoop Import

Let’s assume that we have user jarcec who needs to import data into a Hive database named default. User jarcec is part of a group that is also called jarcec – in real life the name of the group doesn’t have to be the same as the username and that is fine.

With an unsecured Hive installation, the Hadoop administrator would have to jump in and grant writing privilege to user jarcec for directory /user/hive/warehouse or one of its subdirectories. With Sentry and HDFS integration, the Hadoop administrator no longer needs to jump in. Instead Sqoop will reuse the same authorization policies that has been configured through Hive SQL or via the Sentry Hue Application. Let’s assume that user bc is jarcec‘s Manager and already has privileges to grant privileges in the default database.

    1. bc starts by invoking beeline and connecting to HiveServer2:

 

 

 

[bc@sqoopsentry-1 ~]$ beeline

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> !connect jdbc:hive2://sqoopsentry-1.vpc.cloudera.com:10000/default;principal=hive/sqoopsentry-1.vpc.cloudera.com@ENT.CLOUDERA.COM

    1. In case that user jarcec is not part of any role yet, we need to create a role for him:

 

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> CREATE ROLE jarcec_role;

 

No rows affected (0.769 seconds)

    1. And this new role jarcec_role needs to be granted to jarcec‘s group jarcec.

 

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> GRANT ROLE jarcec_role to GROUP jarcec;

 

No rows affected (0.651 seconds)

    1. And finally bc can grant access to database default (or any other) to the role jarcec_role;

 

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> GRANT ALL ON DATABASE default TO ROLE jarcec_role;

 

No rows affected (0.16 seconds)

By executing the steps above, user jarcec has been given privilege to do any action (insert or select) with all objects inside database default. That includes the ability to create new tables, insert data or simply querying existing tables. With those privileges user jarcec can run the following Sqoop command as he was used to:

 

 

 

[jarcec@sqoopsentry-1 ~]$ sqoop import –connect jdbc:mysql://mysql.ent.cloudera.com/sqoop –username sqoop –password sqoop –table text <strong>–hive-import</strong>

14/12/14 15:37:38 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.3.0

 …

14/12/14 15:38:58 INFO mapreduce.ImportJobBase: Transferred 249.7567 MB in 75.8448 seconds (3.293 MB/sec) 

14/12/14 15:38:58 INFO mapreduce.ImportJobBase: Retrieved 1000000 records.

14/12/14 15:38:58 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `text` AS t LIMIT 1

14/12/14 15:38:58 INFO hive.HiveImport: Loading uploaded data into Hive

14/12/14 15:39:09 INFO hive.HiveImport: 14/12/14 15:39:09 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.

14/12/14 15:39:09 INFO hive.HiveImport:

14/12/14 15:39:09 INFO hive.HiveImport: Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.26/jars/hive-common-0.13.1-cdh5.3.0.jar!/hive-log4j.properties 

14/12/14 15:39:12 INFO hive.HiveImport: OK

14/12/14 15:39:12 INFO hive.HiveImport: Time taken: 1.079 seconds

14/12/14 15:39:12 INFO hive.HiveImport: Loading data to table default.text

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00000

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00001

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00002

14/12/14 15:39:13 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00003

14/12/14 15:39:13 INFO hive.HiveImport: Table default.text stats: [numFiles=4, numRows=0, totalSize=261888896, rawDataSize=0]

14/12/14 15:39:13 INFO hive.HiveImport: OK

14/12/14 15:39:13 INFO hive.HiveImport: Time taken: 0.719 seconds

14/12/14 15:39:13 INFO hive.HiveImport: <strong>Hive import complete</strong>.

14/12/14 15:39:13 INFO hive.HiveImport: Export directory is not empty, keeping it.

And jarcec can easily confirm in beeline that data have been indeed imported into Hive:

 

 

0: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> show tables from default;

+————+–+

|  tab_name  |

+————+–+

| text       |

+————+–+

1 row selected (0.177 seconds)

0: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> select count(*) from text;

+———-+–+

|   _c0    |

+———-+–+

| 1000000  |

+———-+–+

1 row selected (72.188 seconds)

If Hive is configured to inherit permissions, you might notice that Sqoop will print out several warnings similar to this one:

 

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00000

As there is no need to inherit HDFS permissions when Sentry is enabled in HDFS, you can safely ignore such messages.

 

Managing a Hadoop Cluster

 

  1. Introduction
  2. Goals for this Module
  3. Outline
  4. Basic Setup
    1. Java Requirements
    2. Operating System
    3. Downloading and Installing Hadoop
  5. Important Directories
  6. Selecting Machines
  7. Cluster Configurations
    1. Small Clusters: 2-10 Nodes
    2. Medium Clusters: 10-40 Nodes
    3. Large Clusters: Multiple Racks
  8. Performance Monitoring
    1. Ganglia
    2. Nagios
  9. Additional Tips
  10. References & Resources

Basic Setup

This section discusses the general platform requirements for Hadoop.

JAVA REQUIREMENTS

Hadoop is a Java-based system. Recent versions of Hadoop require Sun Java 1.6.

Compiling Java programs to run on Hadoop can be done with any number of commonly-used Java compilers. Sun’s compiler is fine, as is ecj, the Eclipse Compiler for Java. A bug in gcj, the GNU Compiler for Java, causes incompatibility between generated classes and Hadoop; it should not be used.

OPERATING SYSTEM

As Hadoop is written in Java, it is mostly portable between different operating systems. Developers can and do run Hadoop under Windows. The various scripts used to manage Hadoop clusters are written in a UNIX shell scripting language that assumes sh– or bash-like behavior. Thus running Hadoop under Windows requires cygwin to be installed. The Hadoop documentation stresses that a Windows/cygwin installation is for development only. The vast majority of server deployments today are on Linux. (Other POSIX-style operating systems such as BSD may also work. Some Hadoop users have reported successfully running the system on Solaris.) The instructions on this page assume a command syntax and system design similar to Linux, but can be readily adapted to other systems.

DOWNLOADING AND INSTALLING HADOOP

Hadoop is available for download from the project homepage athttp://hadoop.apache.org/core/releases.html. Here you will find several versions of Hadoop available.

The versioning strategy used is major.minor.revision. Increments to the major version number represent large differences in operation or interface and possibly significant incompatible changes. At the time of this writing (September 2008), there have been no major upgrades; all Hadoop versions have their major version set to 0. The minor version represents a large set of feature improvements and enhancements. Hadoop instances with different minor versions may use different versions of the HDFS file formats and protocols, requiring a DFS upgrade to migrate from one to the next. Revisions are used to provide bug fixes. Within a minor version, the most recent revision contains the most stable patches.

Within the releases page, two or three versions of Hadoop will be readily available, corresponding to the highest revision number in the most recent two or three minor version increments. The stable version is the highest revision number in the second most recent minor version. Production clusters should use this version. The most recent minor version may include improved performance or new features, but may also introduce regressions that will be fixed in ensuing revisions.

At the time of this writing, 0.18.0 is the most recent version, with 0.17.2 being the “stable” release. These example instructions assume that version 0.18.0 is being used; the directions will not change significantly for any other version, except by substituting the new version number where appropriate.

To install Hadoop, first download and install prerequisite software. This includes Java 6 or higher. Distributed operation requires ssh and sshd. Windows users must install and configure cygwin as well. Then download a Hadoop version using a web browser, wget, or curl, and then unzip the package:

gunzip hadoop-0.18.0.tar.gz
tar vxf hadoop-0.18.0.tar

Within the hadoop-0.18.0/ directory which results, there will be several subdirectories. The most interesting of these are bin/, where scripts to run the cluster are located, and conf/ where the cluster’s configuration is stored.

Enter the conf/ directory and modify hadoop-env.sh. The JAVA_HOME variable must be set to the base directory of your Java installation. It is recommended that you install Java in the same location on all machines in the cluster, so this file can be replicated to each machine without modification.

The hadoop-site.xml file must also be modified to contain a number of configuration settings. The sections below address the settings which should be included here.

If you are interested in setting up a development installation, running Hadoop on a single machine, the Hadoop documentation includes getting started instructions which will configure Hadoop for standalone or “pseudo-distributed” operation.

Standalone installations run all of Hadoop and your application inside a single Java process. The distributed file system is not used; file are read from and written to the local file system. Such a setup can be helpful for debugging Hadoop applications.

Pseudo-distributed operation refers to the use of several separate processes representing the different daemons (NameNode, DataNode, JobTracker, TaskTracker) and a separate task process to perform a Hadoop job, but with all processes running on a single machine. A pseudo-distributed instance will have a functioning NameNode/DataNode managing a “DFS” of sorts. Files in HDFS are in a separate namespace from the local file system, and are stored as block objects in a Hadoop-managed directory. However, it is not truly distributed, as no processing or data storage is performed on remote notes. A pseudo-distributed instance can be extended into a fully distributed cluster by adding more machines to function as Task/DataNodes, but more configuration settings are usually required to deploy a Hadoop cluster for multiple users.

The rest of this document deals with configuring Hadoop clusters of multiple nodes, intended for use by one or more developers.

After the conf/hadoop-site.xml is configured according to one of the models in the getting started, the sections below, or your own settings, two more files must be written.

The conf/masters file contains the hostname of the SecondaryNameNode. This should be changed from “localhost” to the fully-qualified domain name of the node to run the SecondaryNameNode service. It does not need to contain the hostname of the JobTracker/NameNode machine; that service is instantiated on whichever node is used to run bin/start-all.sh, regardless of the masters file. The conf/slaves file should contain the hostname of every machine in the cluster which should start TaskTracker and DataNode daemons. One hostname should be written per line in each of these files, e.g.:

slave01
slave02
slave03
...

The master node does not usually also function as a slave node, except in installations across only 1 or 2 machines.

If the nodes on your cluster do not support passwordless ssh, you should configure this now:

$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

This will enable passwordless ssh login to the local machine. (You can verify that this works by executingssh localhost.) The ~/.ssh/id_dsa.pub and authorized_keys files should be replicated on all machines in the cluster.

At this point, the configuration must be replicated across all nodes in the cluster. Small clusters may use rsync or copy the configuration directory to each node. Larger clusters should use a configuration management system such as bcfg2, smartfrog, or puppet. NFS should be avoided as much as is possible, as it is a scalability bottleneck. DataNodes should never share block storage or other high-bandwidth responsibilities over NFS, and should avoid sharing configuration information over NFS if possible.

Various directories should be created on each node. The NameNode requires the NameNode metadata directory:

$ mkdir -p /home/hadoop/dfs/name

And every node needs the Hadoop tmp directory and DataNode directory created. Rather than logging in to each node and performing the steps multiple times manually, the file bin/slaves.sh allows a command to be executed on all nodes in the slaves file. For example, we can create these directories by executing the following commands on the NameNode:

$ mkdir -p /tmp/hadoop  # make the NameNode's tmp dir
$ export HADOOP_CONF_DIR=${HADOOP_HOME}/conf
$ export HADOOP_SLAVES=${HADOOP_CONF_DIR}/slaves
$ ${HADOOP_HOME}/bin/slaves.sh "mkdir -p /tmp/hadoop"
$ ${HADOOP_HOME}/bin/slaves.sh "mkdir -p /home/hadoop/dfs/data"

The environment variables $HADOOP_CONF_DIR and $HADOOP_SLAVES are used by the bin/slaves.shscript to find the slave machines list. The provided command is then executed over ssh. If you need particular ssh options, the contents of the $HADOOP_SSH_OPTS variable are passed to ssh as arguments.

We then format HDFS by executing the following command on the NameNode:

$ bin/hadoop namenode -format

And finally, start the cluster:

$ bin/start-all.sh

Now it is time to load in data and start processing it with Hadoop! Good luck!

The remainder of this document discusses various trade-offs in cluster configurations for different sizes, and reviews the settings which may be placed in the hadoop-site.xml file.

Important Directories

One of the basic tasks involved in setting up a Hadoop cluster is determining where the several various Hadoop-related directories will be located. Where they go is up to you; in some cases, the default locations are inadvisable and should be changed. This section identifies these directories.

Directory Description Default location Suggested location
HADOOP_LOG_DIR Output location for log files from daemons ${HADOOP_HOME}/logs /var/log/hadoop
hadoop.tmp.dir A base for other temporary directories /tmp/hadoop-${user.name} /tmp/hadoop
dfs.name.dir Where the NameNode metadata should be stored ${hadoop.tmp.dir}/dfs/name /home/hadoop/dfs/name
dfs.data.dir Where DataNodes store their blocks ${hadoop.tmp.dir}/dfs/data /home/hadoop/dfs/data
mapred.system.dir The in-HDFS path to shared MapReduce system files ${hadoop.tmp.dir}/mapred/system /hadoop/mapred/system

This table is not exhaustive; several other directories are listed in conf/hadoop-defaults.xml. The remaining directories, however, are initialized by default to reside under hadoop.tmp.dir, and are unlikely to be a concern.

It is critically important in a real cluster that dfs.name.dir and dfs.data.dir be moved out fromhadoop.tmp.dir. A real cluster should never consider these directories temporary, as they are where all persistent HDFS data resides. Production clusters should have two paths listed for dfs.name.dir which are on two different physical file systems, to ensure that cluster metadata is preserved in the event of hardware failure.

A multi-user configuration should also definitely adjust mapred.system.dir. Hadoop’s default installation is designed to work for standalone operation, which does not use HDFS. Thus it conflates HDFS and local file system paths. When enabling HDFS, however, MapReduce will store shared information about jobs in mapred.system.dir on the DFS. If this path includes the current username (as the defaulthadoop.tmp.dir does), this will prevent proper operation. The current username on the submitting node will be the username who actually submits the job, e.g., “alex.” All other nodes will have the current username set to the username used to launch Hadoop itself (e.g., “hadoop”). If these do not match, the TaskTrackers will be unable to find the job information and run the MapReduce job.

For this reason, it is also advisable to remove ${user.name} from the general hadoop.tmp.dir.

While most of the directories listed above (all the ones with names in “foo.bar.baz” form) can be relocated via the conf/hadoop-site.xml file, the HADOOP_LOG_DIR directory is specified in conf/hadoop-env.sh as an environment variable. Relocating this directory requires editing this script.

Selecting Machines

Before diving into the details of configuring nodes, we include a brief word on choosing hardware for a cluster. While the processing demands of different organizations will dictate a different machine configuration for optimum efficiency, there are are commonalities associated with most Hadoop-based tasks.

Hadoop is designed to take advantage of whatever hardware is available. Modest “beige box” PCs can be used to run small Hadoop setups for experimentation and debugging. Providing greater computational resources will, to a point, result in increased performance by your Hadoop cluster. Many existing Hadoop deployments include Xeon processors in the 1.8-2.0GHz range. Hadoop jobs written in Java can consume between 1 and 2 GB of RAM per core. If you use HadoopStreaming to write your jobs in a scripting language such as Python, more memory may be advisable. Due to the I/O-bound nature of Hadoop, adding higher-clocked CPUs may not be the most efficient use of resources, unless the intent is to run HadoopStreaming. Big data clusters, of course, can use as many large and fast hard drives as are available. However, too many disks in a single machine will result in many disks not being used in parallel. It is better to have three machines with 4 hard disks each than one machine with 12 drives. The former configuration will be able to write to more drives in parallel and will provide greater throughput. Finally, gigabit Ethernet connections between machines will greatly improve performance over a cluster connected via a slower network interface.

It should be noted that the lower limit on minimum requirements for running Hadoop is well below the specifications for modern desktop or server class machines. However, multiple pages on the Hadoop wiki suggest similar specifications to those posted here for high-performance cluster design. (See [1],  [2].)

Cluster Configurations

This section provides cluster configuration advice and specific settings for clusters of varying sizes. These sizes were picked to demonstrate basic categories of clusters; your own installation may be a hybrid of different aspects of these profiles. Here we suggest various properties which should be included in theconf/hadoop-site.xml file to most effectively use a cluster of a given size, as well as other system configuration elements. The next section describes how to finish the installation after implementing the configurations described here. You should read through each of these configurations in order, as configuration suggestions for larger deployments are based on the preceding ones.

SMALL CLUSTERS: 2-10 NODES

Setting up a small cluster for development purposes is a very straightforward task. When using two nodes, one node will act as both NameNode/JobTracker and a DataNode/TaskTracker; the other node is only a DataNode/TaskTracker. Clusters of three or more machines typically use a dedicated NameNode/JobTracker, and all other nodes are workers.

A relatively minimalist configuration in conf/hadoop-site.xml will suffice for this installation:

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>head.server.node.com:9001</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://head.server.node.com:9000</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/home/hadoop/dfs/data</value>
<final>true</final>
</property>
<property>
<name>dfs.name.dir</name>
<value>/home/hadoop/dfs/name</value>
<final>true</final>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop</value>
<final>true</final>
</property>
<property>
<name>mapred.system.dir</name>
<value>/hadoop/mapred/system</value>
<final>true</final>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>

Clusters closer to the 8-10 node range may want to set dfs.replication to 3. Values higher than 3 are usually not necessary. Individual files which are heavily utilized by a large number of nodes may have their particular replication factor manually adjusted upward independent of the cluster default.

MEDIUM CLUSTERS: 10-40 NODES

This category is for clusters that occupy the majority of a single rack. Additional considerations for high availability and reliability come into play at this level.

The single point of failure in a Hadoop cluster is the NameNode. While the loss of any other machine (intermittently or permanently) does not result in data loss, NameNode loss results in cluster unavailability. The permanent loss of NameNode data would render the cluster’s HDFS inoperable.

Therefore, another step should be taken in this configuration to back up the NameNode metadata. One machine in the cluster should be designated as the NameNode’s backup. This machine does not run the normal Hadoop daemons (i.e., the DataNode and TaskTracker). Instead, it exposes a directory via NFS which is only mounted on the NameNode (e.g., /mnt/namenode-backup/). The cluster’s hadoop-site.xml file should then instruct the NameNode to write to this directory as well:

  <property>
<name>dfs.name.dir</name>
<value>/home/hadoop/dfs/name,/mnt/namenode-backup</value>
<final>true</final>
</property>

The NameNode will write its metadata to each directory in the comma-separated list of dfs.name.dir. If/mnt/namenode-backup is NFS-mounted from the backup machine, this will ensure that a redundant copy of HDFS metadata is available. The backup node should serve /mnt/namenode-backup from/home/hadoop/dfs/name on its own drive. This way, if the NameNode hardware completely dies, the backup machine can be brought up as the NameNode with no reconfiguration of the backup machine’s software. To switch the NameNode and backup nodes, the backup machine should have its IP address changed to the original NameNode’s IP address, and the server daemons should be started on that machine. The IP address must be changed to allow the DataNodes to recognize it as the “original” NameNode for HDFS. (Individual DataNodes will cache the DNS entry associated with the NameNode, so just changing the hostname is insufficient; the name reassignment must be performed at the IP address level.)

The backup machine still has Hadoop installed and configured on it in the same way as every other node in the cluster, but it is not listed in the slaves file, so normal daemons are not started there.

One function that the backup machine can be used for is to serve as the SecondaryNameNode. Note that this is not a failover NameNode process. The SecondaryNameNode process connects to the NameNode and takes periodic snapshots of its metadata (though not in real time). The NameNode metadata consists of a snapshot of the file system called the fsimage and a series of deltas to this snapshot called theeditlog. With these two files, the current state of the system can be determined exactly. The SecondaryNameNode merges the fsimage and editlog into a new fsimage file that is a more compact representation of the file system state. Because this process can be memory intensive, running it on the backup machine (instead of on the NameNode itself) can be advantageous.

To configure the SecondaryNameNode daemon to run on the backup machine instead of on the master machine, edit the conf/masters file so that it contains the name of the backup machine. Thebin/start-dfs.sh and bin/start-mapred.sh (and by extension, bin/start-all.sh) scripts will actually always start the master daemons (NameNode and JobTracker) on the local machine. The slavesfile is used for starting DataNodes and TaskTrackers. The masters file is used for starting the SecondaryNameNode. This filename is used despite the fact that the master node may not be listed in the file itself.

A cluster of this size may also require nodes to be periodically decommissioned. As noted in Module 2, several machines cannot be turned off simultaneously, or data loss may occur. Nodes must be decommissioned on a schedule that permits replication of blocks being decommissioned. To prepare for this eventuality in advance, an excludes file should be added to the conf/hadoop-site.xml:

  <property>
<name>dfs.hosts.exclude</name>
<value>/home/hadoop/excludes</value>
<final>true</final>
</property>
<property>
<name>mapred.hosts.exclude</name>
<value>/home/hadoop/excludes</value>
<final>true</final>
</property>

This property should provide the full path to the excludes file (the actual location of the file is up to you). You should then create an empty file with this name:

$ touch /home/hadoop/excludes

While the dfs.hosts.exclude property allows the definition of a list of machines which are explicitly barred from connecting to the NameNode (and similarly, mapred.hosts.exclude for the JobTracker), a large cluster may want to explicitly manage a list of machines which are approved to connect to a given JobTracker or NameNode.

The dfs.hosts and mapred.hosts properties allow an administrator to supply a file containing an approved list of hostnames. If a machine is not in this list, it will be denied access to the cluster. This can be used to enforce policies regarding which teams of developers have access to which MapReduce sub-clusters. These are configured in exactly the same way as the excludes file.

Of course, at this scale and above, 3 replicas of each block are advisable; the hadoop-site.xml file should contain:

  <property>
<name>dfs.replication</name>
<value>3</value>
</property>

By default, HDFS does not preserve any free space on the DataNodes; the DataNode service will continue to accept blocks until all free space on the disk is exhausted, which may cause problems. The following setting will require each DataNode to reserve at least 1 GB of space on the drive free before it writes more blocks, which helps preserve system stability:

  <property>
<name>dfs.datanode.du.reserved</name>
<value>1073741824</value>
<final>true</final>
</property>

Another parameter to watch is the heap size associated with each task. Hadoop caps the heap of each task process at 200 MB, which is too small for most data processing tasks. This cap is set as a parameter passed to the child Java process. It is common to override this with a higher cap by specifying:

  <property>
<name>mapred.child.java.opts</name>
<value>-Xmx512m</value>
</property>

This will provide each child task with 512 MB of heap. It is not unreasonable in some cases to specify -Xmx1024m instead. In the interest of providing only what is actually required, it may be better to leave this set to 512 MB by default, and allowing applications to manually configure for a full GB of RAM/task themselves.

Using multiple drives per machine

While small clusters often have only one hard drive per machine, more high-performance configurations may include two or more disks per node. Slight configuration changes are required to make Hadoop take advantage of additional disks.

DataNodes can be configured to write blocks out to multiple disks via the dfs.data.dir property. It can take on a comma-separated list of directories. Each block is written to one of these directories. E.g., assuming that there are four disks, mounted on /d1/d2/d3, and /d4, the following (or something like it) should be in the configuration for each DataNode:

  <property>
<name>dfs.data.dir</name>
<value>/d1/dfs/data,/d2/dfs/data,/d3/dfs/data,/d4/dfs/data</value>
<final>true</final>
</property>

MapReduce performance can also be improved by distributing the temporary data generated by MapReduce tasks across multiple disks on each machine:

  <property>
<name>mapred.local.dir</name>
<value>/d1/mapred/local,/d2/mapred/local,/d3/mapred/local,/d4/mapred/local</value>
<final>true</final>
</property>

Finally, if there are multiple drives available in the NameNode, they can be used to provide additional redundant copies of the NameNode metadata in the event of the failure of one drive. Unlike the above two properties, where one drive out of many is selected to write a piece of data, the NameNode writes to eachcomma-separated path in dfs.name.dir. If too many drives are listed here it may adversely affect the performance of the NameNode, as the probability of blocking on one or more I/O operations increases with the number of devices involved, but it is imperative that the sole copy of the metadata does not reside on a single drive.

LARGE CLUSTERS: MULTIPLE RACKS

Configuring multiple racks of machines for Hadoop requires further advance planning. The possibility of rack failure now exists, and operational racks should be able to continue even if entire other racks are disabled. Naive setups may result in large cross-rack data transfers which adversely affect performance. Furthermore, in a large cluster, the amount of metadata under the care of the NameNode increases. This section proposes configuring several properties to help Hadoop operate at very large scale, but the numbers used in this section are just guidelines. There is no single magic number which works for all deployments, and individual tuning will be necessary. These will, however, provide a starting point and alert you to settings which will be important.

The NameNode is responsible for managing metadata associated with each block in the HDFS. As the amount of information in the rack scales into the 10’s or 100’s of TB, this can grow to be quite sizable. The NameNode machine needs to keep the blockmap in RAM to work efficiently. Therefore, at large scale, this machine will require more RAM than other machines in the cluster. The amount of metadata can also be dropped almost in half by doubling the block size:

  <property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>

This changes the block size from 64MB (the default) to 128MB, which decreases pressure on the NameNode’s memory. On the other hand, this potentially decreases the amount of parallelism that can be achieved, as the number of blocks per file decreases. This means fewer hosts may have sections of a file to offer to MapReduce tasks without contending for disk access. The larger the individual files involved (or the more files involved in the average MapReduce job), the less of an issue this is.

In the medium configuration, the NameNode wrote HDFS metadata through to another machine on the rack via NFS. It also used that same machine to checkpoint the NameNode metadata and compact it in the SecondaryNameNode process. Using this same setup will result in the cluster being dependent on a single rack’s continued operation. The NFS-mounted write-through backup should be placed in a different rack from the NameNode, to ensure that the metadata for the file system survives the failure of an individual rack. For the same reason, the SecondaryNameNode should be instantiated on a separate rack as well.

With multiple racks of servers, RPC timeouts may become more frequent. The NameNode takes a continual census of DataNodes and their health via heartbeat messages sent every few seconds. A similar timeout mechanism exists on the MapReduce side with the JobTracker. With many racks of machines, they may force one another to timeout because the master node is not handling them fast enough. The following options increase the number of threads on the master machine dedicated to handling RPC’s from slave nodes:

  <property>
<name>dfs.namenode.handler.count</name>
<value>40</value>
</property>
<property>
<name>mapred.job.tracker.handler.count</name>
<value>40</value>
</property>

These settings were used in clusters of several hundred nodes. They should be scaled up accordingly with larger deployments.

The following settings provide additional starting points for optimization. These are based on the reported configurations of actual clusters from 250 to 2000 nodes.

Property Range Description
io.file.buffer.size 32768-131072 Read/write buffer size used in SequenceFiles (should be in multiples of the hardware page size)
io.sort.factor 50-200 Number of streams to merge concurrently when sorting files during shuffling
io.sort.mb 50-200 Amount of memory to use while sorting data
mapred.reduce.parallel.copies 20-50 Number of concurrent connections a reducer should use when fetching its input from mappers
tasktracker.http.threads 40-50 Number of threads each TaskTracker uses to provide intermediate map output to reducers
mapred.tasktracker.map.tasks.maximum 1/2 * (cores/node) to 2 * (cores/node) Number of map tasks to deploy on each machine.
mapred.tasktracker.reduce.tasks.maximum 1/2 * (cores/node) to 2 * (cores/node) Number of reduce tasks to deploy on each machine.

Rack awareness

In a multi-rack configuration, it is important to ensure that replicas of blocks are placed on multiple racks to minimize the possibility of data loss. Thus, a rack-aware placement policy should be used. The guidelines there suggest how to set up a basic rack awareness policy; due to the heterogeneity of network topologies, a definitive general-purpose solution cannot be provided here.

This tutorial targets Hadoop version 0.18.0. While most of the interfaces described will work on other, older versions of Hadoop, rack-awareness underwent a major overhaul in version 0.17. Thus, the following does not apply to version 0.16 and before.

One major consequence of the upgrade is that while rack-aware block replica placement has existed in Hadoop for some time, rack-aware task placement has only been added in version 0.17. If Hadoop MapReduce cannot place a task on the same node as the block of data which the task is scheduled to process, then it picks an arbitrary different node on which to schedule the task. Starting with 0.17.0, tasks will be placed (when possible) on the same rack as at least one replica of an input data block for a job, which should further minimize the amount of inter-rack data transfers required to perform a job.

Hadoop includes an interface called DNSToSwitchMapping which allows arbitrary Java code to be used to map servers onto a rack topology. The configuration key topology.node.switch.mapping.impl can be used to specify a class which meets this interface. More straightforward than writing a Java class for this purpose, however, is to use the default mapper, which executes a user-specified script (or other command) on each node of the cluster, which returns the rack id for that node. These rack ids are then aggregated and sent back to the NameNode.

Note that the rack mapping script used by this system is incompatible with the 0.16 method of usingdfs.network.script. Whereas dfs.network.script runs on each DataNode, a new script specified bytopology.script.file.name is run by the master node only. To set the rack mapping script, specify the key topology.script.file.name in conf/hadoop-site.xml.

Cluster contention

If you are configuring a large number of machines, it is likely that you have a large number of users who wish to submit jobs to execute on it. Hadoop’s job scheduling algorithm is based on a simple FIFO scheduler. Using this in a large deployment without external controls or policies agreed upon by all users can lead to lots of contention for the JobTracker, causing short jobs to be delayed by other long-running tasks and frustrating users.

An advanced technique to combat this problem is to configure a single HDFS cluster which spans all available machines, and configure several separate MapReduce clusters with their own JobTrackers and pools of TaskTrackers. All MapReduce clusters are configured to use the same DFS and the same NameNode; but separate groups of machines have a different machine acting as JobTracker (i.e., subclusters have different settings for mapred.job.tracker). Breaking machines up into several smaller clusters, each of which contains 20-40 TaskTrackers, provides users with lower contention for the system. Users may be assigned to different clusters by policy, or they can use the JobTracker status web pages (a web page exposed on port 50030 of each JobTracker) to determine which is underutilized.

Multiple strategies exist for this assignment process. It is considered best practice to stripe the TaskTrackers associated with each JobTracker across all racks. This maximizes the availability of each cluster (as they are all resistant to individual rack failure), and works with the HDFS replica placement policy to ensure that each MapReduce cluster can find rack-local replicas of all files used in any MapReduce jobs.

Performance Monitoring

Multiple tools exist to monitor large clusters for performance and troubleshooting. This section briefly highlights two such tools.

GANGLIA

Ganglia is a performance monitoring framework for distributed systems. Ganglia provides a distributed service which collects metrics on individual machines and forwards them to an aggregator which can report back to an administrator on the global state of a cluster.

Ganglia is designed to be integrated into other applications to collect statistics about their operation. Hadoop includes a performance monitoring framework which can use Ganglia as its backend. Instructions are available on the Hadoop wiki as to how to enable Ganglia metrics in Hadoop. Instructions are also included below.

After installing and configuring Ganglia on your cluster, to direct Hadoop to output its metric reports to Ganglia, create a file named hadoop-metrics.properties in the $HADOOP_HOME/conf directory. The file should have the following contents:

dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
dfs.period=10
dfs.servers=localhost:8649

mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
mapred.period=10
mapred.servers=localhost:8649

This assumes that gmond is running on each machine in the cluster. Instructions on the Hadoop wiki note that (in the experience of the wiki article author) this may result in all nodes reporting their results as “localhost” instead of with their individual hostnames. If this problem affects your cluster, an alternate configuration is proposed, in which all Hadoop instances speak directly with gmetad:

dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
dfs.period=10
dfs.servers=@GMETAD@:8650

mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
mapred.period=10
mapred.servers=@GMETAD@:8650

Where @GMETAD@ is the hostname of the server on which the gmetad service is running. If deploying Ganglia and Hadoop on a very large number of machines, the impact of this configuration (vs. the standard Ganglia configuration where individual services talk to gmond on localhost) should be evaluated.

NAGIOS

While Ganglia will monitor Hadoop-specific metrics, general information about the health of the cluster should be monitored with an additional tool.

Nagios is a machine and service monitoring system designed for large clusters. Nagios will provide useful diagnostic information for tuning your cluster, including network, disk, and CPU utilization across machines.

Additional Tips

The following are a few additional pieces of small advice:

  • Create a separate user named “hadoop” to run your instances; this will separate the Hadoop processes from any users on the system. Do not run Hadoop as root.
  • If Hadoop is installed in /home/hadoop/hadoop-0.18.0, link /home/hadoop/hadoop to/home/hadoop/hadoop-0.18.0. When upgrading to a newer version in the future, the link can be moved to make this process easier on other scripts that depend on the hadoop/bin directory.

References & Resources

Hadoop Getting Started – Single-node configuration instructions
Hadoop Cluster Setup – Official Hadoop configuration instructions
Michael Noll’s Hadoop configuration tutorials for single and multiple node configurations.

The Hadoop Approach

Hadoop is designed to efficiently process large volumes of information by connecting many commodity computers together to work in parallel. The theoretical 1000-CPU machine described earlier would cost a very large amount of money, far more than 1,000 single-CPU or 250 quad-core machines. Hadoop will tie these smaller and more reasonably priced machines together into a single cost-effective compute cluster.

COMPARISON TO EXISTING TECHNIQUES

Performing computation on large volumes of data has been done before, usually in a distributed setting. What makes Hadoop unique is its simplified programming model which allows the user to quickly write and test distributed systems, and its efficient, automatic distribution of data and work across machines and in turn utilizing the underlying parallelism of the CPU cores.

Grid scheduling of computers can be done with existing systems such as Condor. But Condor does not automatically distribute data: a separate SAN must be managed in addition to the compute cluster. Furthermore, collaboration between multiple compute nodes must be managed with a communication system such as MPI. This programming model is challenging to work with and can lead to the introduction of subtle errors.

DATA DISTRIBUTION

In a Hadoop cluster, data is distributed to all the nodes of the cluster as it is being loaded in. The Hadoop Distributed File System (HDFS) will split large data files into chunks which are managed by different nodes in the cluster. In addition to this each chunk is replicated across several machines, so that a single machine failure does not result in any data being unavailable. An active monitoring system then re-replicates the data in response to system failures which can result in partial storage. Even though the file chunks are replicated and distributed across several machines, they form a single namespace, so their contents are universally accessible.

Data is conceptually record-oriented in the Hadoop programming framework. Individual input files are broken into lines or into other formats specific to the application logic. Each process running on a node in the cluster then processes a subset of these records. The Hadoop framework then schedules these processes in proximity to the location of data/records using knowledge from the distributed file system. Since files are spread across the distributed file system as chunks, each compute process running on a node operates on a subset of the data. Which data operated on by a node is chosen based on its locality to the node: most data is read from the local disk straight into the CPU, alleviating strain on network bandwidth and preventing unnecessary network transfers. This strategy of moving computation to the data, instead of moving the data to the computation allows Hadoop to achieve high data locality which in turn results in high performance.

load-into-dfs

Figure 1.1: Data is distributed across nodes at load time.

MAPREDUCE: ISOLATED PROCESSES

Hadoop limits the amount of communication which can be performed by the processes, as each individual record is processed by a task in isolation from one another. While this sounds like a major limitation at first, it makes the whole framework much more reliable. Hadoop will not run just any program and distribute it across a cluster. Programs must be written to conform to a particular programming model, named “MapReduce.”

In MapReduce, records are processed in isolation by tasks called Mappers. The output from the Mappers is then brought together into a second set of tasks called Reducers, where results from different mappers can be merged together.

mapreduce-process

Figure 1.2: Mapping and reducing tasks run on nodes where individual records of data are already present.

Separate nodes in a Hadoop cluster still communicate with one another. However, in contrast to more conventional distributed systems where application developers explicitly marshal byte streams from node to node over sockets or through MPI buffers, communication in Hadoop is performed implicitly. Pieces of data can be tagged with key names which inform Hadoop how to send related bits of information to a common destination node. Hadoop internally manages all of the data transfer and cluster topology issues.

By restricting the communication between nodes, Hadoop makes the distributed system much more reliable. Individual node failures can be worked around by restarting tasks on other machines. Since user-level tasks do not communicate explicitly with one another, no messages need to be exchanged by user programs, nor do nodes need to roll back to pre-arranged checkpoints to partially restart the computation. The other workers continue to operate as though nothing went wrong, leaving the challenging aspects of partially restarting the program to the underlying Hadoop layer.

FLAT SCALABILITY

One of the major benefits of using Hadoop in contrast to other distributed systems is its flat scalability curve. Executing Hadoop on a limited amount of data on a small number of nodes may not demonstrate particularly stellar performance as the overhead involved in starting Hadoop programs is relatively high. Other parallel/distributed programming paradigms such as MPI (Message Passing Interface) may perform much better on two, four, or perhaps a dozen machines. Though the effort of coordinating work among a small number of machines may be better-performed by such systems, the price paid in performance and engineering effort (when adding more hardware as a result of increasing data volumes) increases non-linearly.

A program written in distributed frameworks other than Hadoop may require large amounts of refactoring when scaling from ten to one hundred or one thousand machines. This may involve having the program be rewritten several times; fundamental elements of its design may also put an upper bound on the scale to which the application can grow.

Hadoop, however, is specifically designed to have a very flat scalability curve. After a Hadoop program is written and functioning on ten nodes, very little–if any–work is required for that same program to run on a much larger amount of hardware. Orders of magnitude of growth can be managed with little re-work required for your applications. The underlying Hadoop platform will manage the data and hardware resources and provide dependable performance growth proportionate to the number of machines available.

What is MapReduce? & Role of MapReduce on Bigdata(With Hadoop)

  1. Introduction
  2. Goals for this Module
  3. Outline
  4. Prerequisites
  5. MapReduce Basics
    1. Functional Programming Concepts
    2. List Processing
    3. Mapping Lists
    4. Reducing Lists
    5. Putting them Together in MapReduce
    6. An Example Application: Word Count
    7. The Driver Method
  6. MapReduce Data Flow
    1. A Closer Look
    2. Additional MapReduce Functionality
    3. Fault Tolerance
  7. Checkpoint
  8. More Tips
    1. Chaining Jobs
    2. Troubleshooting: Debugging MapReduce
    3. Listing and Killing Jobs
  9. Additional Language Support
    1. Pipes
    2. Hadoop Streaming
  10. Conclusions
  11. Solution to Inverted Index Code

Prerequisites

This module requires that you have set up a build environment. If you have not already configured Hadoop and successfully run the example applications, go back and do so now.

MapReduce Basics

FUNCTIONAL PROGRAMMING CONCEPTS

MapReduce programs are designed to compute large volumes of data in a parallel fashion. This requires dividing the workload across a large number of machines. This model would not scale to large clusters (hundreds or thousands of nodes) if the components were allowed to share data arbitrarily. The communication overhead required to keep the data on the nodes synchronized at all times would prevent the system from performing reliably or efficiently at large scale.

Instead, all data elements in MapReduce are immutable, meaning that they cannot be updated. If in a mapping task you change an input (key, value) pair, it does not get reflected back in the input files; communication occurs only by generating new output (key, value) pairs which are then forwarded by the Hadoop system into the next phase of execution.

LIST PROCESSING

Conceptually, MapReduce programs transform lists of input data elements into lists of output data elements. A MapReduce program will do this twice, using two different list processing idioms: map, andreduce. These terms are taken from several list processing languages such as LISP, Scheme, or ML.

MAPPING LISTS

The first phase of a MapReduce program is called mapping. A list of data elements are provided, one at a time, to a function called the Mapper, which transforms each element individually to an output data element.

mapFigure 4.1: Mapping creates a new output list by applying a function to individual elements of an input list.

As an example of the utility of map: Suppose you had a function toUpper(str) which returns an uppercase version of its input string. You could use this function with map to turn a list of strings into a list of uppercase strings. Note that we are not modifying the input string here: we are returning a new string that will form part of a new output list.

REDUCING LISTS

Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.

reduceFigure 4.2: Reducing a list iterates over the input values to produce an aggregate value as output.

Reducing is often used to produce “summary” data, turning a large volume of data into a smaller summary of itself. For example, “+” can be used as a reducing function, to return the sum of a list of input values.

PUTTING THEM TOGETHER IN MAPREDUCE:

The Hadoop MapReduce framework takes these concepts and uses them to process large volumes of information. A MapReduce program has two components: one that implements the mapper, and another that implements the reducer. The Mapper and Reducer idioms described above are extended slightly to work in this environment, but the basic principles are the same.

Keys and values: In MapReduce, no value stands on its own. Every value has a key associated with it. Keys identify related values. For example, a log of time-coded speedometer readings from multiple cars could be keyed by license-plate number; it would look like:



AAA-123   65mph, 12:00pm
ZZZ-789   50mph, 12:02pm
AAA-123   40mph, 12:05pm
CCC-456   25mph, 12:15pm
...

The mapping and reducing functions receive not just values, but (key, value) pairs. The output of each of these functions is the same: both a key and a value must be emitted to the next list in the data flow.

MapReduce is also less strict than other languages about how the Mapper and Reducer work. In more formal functional mapping and reducing settings, a mapper must produce exactly one output element for each input element, and a reducer must produce exactly one output element for each input list. In MapReduce, an arbitrary number of values can be output from each phase; a mapper may map one input into zero, one, or one hundred outputs. A reducer may compute over an input list and emit one or a dozen different outputs.

Keys divide the reduce space: A reducing function turns a large list of values into one (or a few) output values. In MapReduce, all of the output values are not usually reduced together. All of the values with the same key are presented to a single reducer together. This is performed independently of any reduce operations occurring on other lists of values, with different keys attached.

reduce-keysFigure 4.3: Different colors represent different keys. All values with the same key are presented to a single reduce task.

AN EXAMPLE APPLICATION: WORD COUNT

A simple MapReduce program can be written to determine how many times different words appear in a set of files. For example, if we had the files:

foo.txt: Sweet, this is the foo file

bar.txt: This is the bar file

We would expect the output to be:

sweet 1
this  2
is    2
the   2
foo   1
bar   1
file  2

Naturally, we can write a program in MapReduce to compute this output. The high-level structure would look like this:

mapper (filename, file-contents):
  for each word in file-contents:
    emit (word, 1)

reducer (word, values):
  sum = 0
  for each value in values:
    sum = sum + value
  emit (word, sum)

Listing 4.1: High-Level MapReduce Word Count

Several instances of the mapper function are created on the different machines in our cluster. Each instance receives a different input file (it is assumed that we have many such files). The mappers output (word, 1) pairs which are then forwarded to the reducers. Several instances of the reducer method are also instantiated on the different machines. Each reducer is responsible for processing the list of values associated with a different word. The list of values will be a list of 1’s; the reducer sums up those ones into a final count associated with a single word. The reducer then emits the final (word, count) output which is written to an output file.

We can write a very similar program to this in Hadoop MapReduce; it is included in the Hadoop distribution in src/examples/org/apache/hadoop/examples/WordCount.java. It is partially reproduced below:

  public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value,
                    OutputCollector<Text, IntWritable> output,
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output,
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

Listing 4.2: Hadoop MapReduce Word Count Source

There are some minor differences between this actual Java implementation and the pseudo-code shown above. First, Java has no native emit keyword; the OutputCollector object you are given as an input will receive values to emit to the next stage of execution. And second, the default input format used by Hadoop presents each line of an input file as a separate input to the mapper function, not the entire file at a time. It also uses a StringTokenizer object to break up the line into words. This does not perform any normalization of the input, so “cat”, “Cat” and “cat,” are all regarded as different strings. Note that the class-variable word is reused each time the mapper outputs another (word, 1) pairing; this saves time by not allocating a new variable for each output. The output.collect() method will copy the values it receives as input, so you are free to overwrite the variables you use.

THE DRIVER METHOD

There is one final component of a Hadoop MapReduce program, called the Driver. The driver initializes the job and instructs the Hadoop platform to execute your code on a set of input files, and controls where the output files are placed. A cleaned-up version of the driver from the example Java implementation that comes with Hadoop is presented below:

  public void run(String inputPath, String outputPath) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(MapClass.class);
    conf.setReducerClass(Reduce.class);

    FileInputFormat.addInputPath(conf, new Path(inputPath));
    FileOutputFormat.setOutputPath(conf, new Path(outputPath));

    JobClient.runJob(conf);
  }

Listing 4.3: Hadoop MapReduce Word Count Driver

This method sets up a job to execute the word count program across all the files in a given input directory (the inputPath argument). The output from the reducers are written into files in the directory identified byoutputPath. The configuration information to run the job is captured in the JobConf object. The mapping and reducing functions are identified by the setMapperClass() and setReducerClass() methods. The data types emitted by the reducer are identified by setOutputKeyClass() andsetOutputValueClass(). By default, it is assumed that these are the output types of the mapper as well. If this is not the case, the methods setMapOutputKeyClass() and setMapOutputValueClass()methods of the JobConf class will override these. The input types fed to the mapper are controlled by theInputFormat used. Input formats are discussed in more detail below. The default input format, “TextInputFormat,” will load data in as (LongWritable, Text) pairs. The long value is the byte offset of the line in the file. The Text object holds the string contents of the line of the file.

The call to JobClient.runJob(conf) will submit the job to MapReduce. This call will block until the job completes. If the job fails, it will throw an IOException. JobClient also provides a non-blocking version calledsubmitJob().

MapReduce Data Flow

Now that we have seen the components that make up a basic MapReduce job, we can see how everything works together at a higher level:

mapreduce-processFigure 4.4: High-level MapReduce pipeline

MapReduce inputs typically come from input files loaded onto our processing cluster in HDFS. These files are evenly distributed across all our nodes. Running a MapReduce program involves running mapping tasks on many or all of the nodes in our cluster. Each of these mapping tasks is equivalent: no mappers have particular “identities” associated with them. Therefore, any mapper can process any input file. Each mapper loads the set of files local to that machine and processes them.

When the mapping phase has completed, the intermediate (key, value) pairs must be exchanged between machines to send all values with the same key to a single reducer. The reduce tasks are spread across the same nodes in the cluster as the mappers. This is the only communication step in MapReduce.Individual map tasks do not exchange information with one another, nor are they aware of one another’s existence. Similarly, different reduce tasks do not communicate with one another. The user never explicitly marshals information from one machine to another; all data transfer is handled by the Hadoop MapReduce platform itself, guided implicitly by the different keys associated with values. This is a fundamental element of Hadoop MapReduce’s reliability. If nodes in the cluster fail, tasks must be able to be restarted. If they have been performing side-effects, e.g., communicating with the outside world, then the shared state must be restored in a restarted task. By eliminating communication and side-effects, restarts can be handled more gracefully.

A CLOSER LOOK

The previous figure described the high-level view of Hadoop MapReduce. From this diagram, you can see where the mapper and reducer components of the Word Count application fit in, and how it achieves its objective. We will now examine this system in a bit closer detail.

mapreduce-flowFigure 4.5: Detailed Hadoop MapReduce data flow

Figure 4.5 shows the pipeline with more of its mechanics exposed. While only two nodes are depicted, the same pipeline can be replicated across a very large number of nodes. The next several paragraphs describe each of the stages of a MapReduce program more precisely.

Input files: This is where the data for a MapReduce task is initially stored. While this does not need to be the case, the input files typically reside in HDFS. The format of these files is arbitrary; while line-based log files can be used, we could also use a binary format, multi-line input records, or something else entirely. It is typical for these input files to be very large — tens of gigabytes or more.

InputFormat: How these input files are split up and read is defined by the InputFormat. An InputFormat is a class that provides the following functionality:

  • Selects the files or other objects that should be used for input
  • Defines the InputSplits that break a file into tasks
  • Provides a factory for RecordReader objects that read the file

Several InputFormats are provided with Hadoop. An abstract type is called FileInputFormat; all InputFormats that operate on files inherit functionality and properties from this class. When starting a Hadoop job, FileInputFormat is provided with a path containing files to read. The FileInputFormat will read all files in this directory. It then divides these files into one or more InputSplits each. You can choose which InputFormat to apply to your input files for a job by calling the setInputFormat() method of the JobConfobject that defines the job. A table of standard InputFormats is given below.

InputFormat: Description: Key: Value:
TextInputFormat Default format; reads lines of text files The byte offset of the line The line contents
KeyValueInputFormat Parses lines into key, val pairs Everything up to the first tab character The remainder of the line
SequenceFileInputFormat A Hadoop-specific high-performance binary format user-defined user-defined

Table 4.1: InputFormats provided by MapReduce

The default InputFormat is the TextInputFormat. This treats each line of each input file as a separate record, and performs no parsing. This is useful for unformatted data or line-based records like log files. A more interesting input format is the KeyValueInputFormat. This format also treats each line of input as a separate record. While the TextInputFormat treats the entire line as the value, the KeyValueInputFormat breaks the line itself into the key and value by searching for a tab character. This is particularly useful for reading the output of one MapReduce job as the input to another, as the default OutputFormat (described in more detail below) formats its results in this manner. Finally, the SequenceFileInputFormat reads special binary files that are specific to Hadoop. These files include many features designed to allow data to be rapidly read into Hadoop mappers. Sequence files are block-compressed and provide direct serialization and deserialization of several arbitrary data types (not just text). Sequence files can be generated as the output of other MapReduce tasks and are an efficient intermediate representation for data that is passing from one MapReduce job to anther.

InputSplits: An InputSplit describes a unit of work that comprises a single map task in a MapReduce program. A MapReduce program applied to a data set, collectively referred to as a Job, is made up of several (possibly several hundred) tasks. Map tasks may involve reading a whole file; they often involve reading only part of a file. By default, the FileInputFormat and its descendants break a file up into 64 MB chunks (the same size as blocks in HDFS). You can control this value by setting the mapred.min.split.size parameter inhadoop-site.xml, or by overriding the parameter in the JobConf object used to submit a particular MapReduce job. By processing a file in chunks, we allow several map tasks to operate on a single file in parallel. If the file is very large, this can improve performance significantly through parallelism. Even more importantly, since the various blocks that make up the file may be spread across several different nodes in the cluster, it allows tasks to be scheduled on each of these different nodes; the individual blocks are thus all processed locally, instead of needing to be transferred from one node to another. Of course, while log files can be processed in this piece-wise fashion, some file formats are not amenable to chunked processing. By writing a custom InputFormat, you can control how the file is broken up (or is not broken up) into splits.

The InputFormat defines the list of tasks that make up the mapping phase; each task corresponds to a single input split. The tasks are then assigned to the nodes in the system based on where the input file chunks are physically resident. An individual node may have several dozen tasks assigned to it. The node will begin working on the tasks, attempting to perform as many in parallel as it can. The on-node parallelism is controlled by the mapred.tasktracker.map.tasks.maximum parameter.

RecordReader: The InputSplit has defined a slice of work, but does not describe how to access it. TheRecordReader class actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper. The RecordReader instance is defined by the InputFormat. The default InputFormat,TextInputFormat, provides a LineRecordReader, which treats each line of the input file as a new value. The key associated with each line is its byte offset in the file. The RecordReader is invoke repeatedly on the input until the entire InputSplit has been consumed. Each invocation of the RecordReader leads to another call to the map() method of the Mapper.

Mapper: The Mapper performs the interesting user-defined work of the first phase of the MapReduce program. Given a key and a value, the map() method emits (key, value) pair(s) which are forwarded to the Reducers. A new instance of Mapper is instantiated in a separate Java process for each map task (InputSplit) that makes up part of the total job input. The individual mappers are intentionally not provided with a mechanism to communicate with one another in any way. This allows the reliability of each map task to be governed solely by the reliability of the local machine. The map() method receives two parameters in addition to the key and the value:

  • The OutputCollector object has a method named collect() which will forward a (key, value) pair to the reduce phase of the job.
  • The Reporter object provides information about the current task; its getInputSplit() method will return an object describing the current InputSplit. It also allows the map task to provide additional information about its progress to the rest of the system. The setStatus() method allows you to emit a status message back to the user. The incrCounter() method allows you to increment shared performance counters. You may define as many arbitrary counters as you wish. Each mapper can increment the counters, and the JobTracker will collect the increments made by the different processes and aggregate them for later retrieval when the job ends.

Partition & Shuffle: After the first map tasks have completed, the nodes may still be performing several more map tasks each. But they also begin exchanging the intermediate outputs from the map tasks to where they are required by the reducers. This process of moving map outputs to the reducers is known asshuffling. A different subset of the intermediate key space is assigned to each reduce node; these subsets (known as “partitions”) are the inputs to the reduce tasks. Each map task may emit (key, value) pairs to any partition; all values for the same key are always reduced together regardless of which mapper is its origin. Therefore, the map nodes must all agree on where to send the different pieces of the intermediate data. ThePartitioner class determines which partition a given (key, value) pair will go to. The default partitioner computes a hash value for the key and assigns the partition based on this result.

Sort: Each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted by Hadoop before they are presented to the Reducer.

Reduce: A Reducer instance is created for each reduce task. This is an instance of user-provided code that performs the second important phase of job-specific work. For each key in the partition assigned to a Reducer, the Reducer’s reduce() method is called once. This receives a key as well as an iterator over all the values associated with the key. The values associated with a key are returned by the iterator in an undefined order. The Reducer also receives as parameters OutputCollector and Reporter objects; they are used in the same manner as in the map() method.

OutputFormat: The (key, value) pairs provided to this OutputCollector are then written to output files. The way they are written is governed by the OutputFormat. The OutputFormat functions much like the InputFormat class described earlier. The instances of OutputFormat provided by Hadoop write to files on the local disk or in HDFS; they all inherit from a common FileOutputFormat. Each Reducer writes a separate file in a common output directory. These files will typically be named part-nnnnn, where nnnnn is the partition id associated with the reduce task. The output directory is set by theFileOutputFormat.setOutputPath() method. You can control which particular OutputFormat is used by calling the setOutputFormat() method of the JobConf object that defines your MapReduce job. A table of provided OutputFormats is given below.

OutputFormat: Description
TextOutputFormat Default; writes lines in “key \t value” form
SequenceFileOutputFormat Writes binary files suitable for reading into subsequent MapReduce jobs
NullOutputFormat Disregards its inputs

Table 4.2: OutputFormats provided by Hadoop

Hadoop provides some OutputFormat instances to write to files. The basic (default) instance is TextOutputFormat, which writes (key, value) pairs on individual lines of a text file. This can be easily re-read by a later MapReduce task using the KeyValueInputFormat class, and is also human-readable. A better intermediate format for use between MapReduce jobs is the SequenceFileOutputFormat which rapidly serializes arbitrary data types to the file; the corresponding SequenceFileInputFormat will deserialize the file into the same types and presents the data to the next Mapper in the same manner as it was emitted by the previous Reducer. The NullOutputFormat generates no output files and disregards any (key, value) pairs passed to it by the OutputCollector. This is useful if you are explicitly writing your own output files in thereduce() method, and do not want additional empty output files generated by the Hadoop framework.

RecordWriter: Much like how the InputFormat actually reads individual records through the RecordReader implementation, the OutputFormat class is a factory for RecordWriter objects; these are used to write the individual records to the files as directed by the OutputFormat.

The output files written by the Reducers are then left in HDFS for your use, either by another MapReduce job, a separate program, for for human inspection.

ADDITIONAL MAPREDUCE FUNCTIONALITY

combiner-flowFigure 4.6: Combiner step inserted into the MapReduce data flow

Combiner: The pipeline showed earlier omits a processing step which can be used for optimizing bandwidth usage by your MapReduce job. Called the Combiner, this pass runs after the Mapper and before the Reducer. Usage of the Combiner is optional. If this pass is suitable for your job, instances of the Combiner class are run on every node that has run map tasks. The Combiner will receive as input all data emitted by the Mapper instances on a given node. The output from the Combiner is then sent to the Reducers, instead of the output from the Mappers. The Combiner is a “mini-reduce” process which operates only on data generated by one machine.

Word count is a prime example for where a Combiner is useful. The Word Count program in listings 1–3 emits a (word, 1) pair for every instance of every word it sees. So if the same document contains the word “cat” 3 times, the pair ("cat", 1) is emitted three times; all of these are then sent to the Reducer. By using a Combiner, these can be condensed into a single ("cat", 3) pair to be sent to the Reducer. Now each node only sends a single value to the reducer for each word — drastically reducing the total bandwidth required for the shuffle process, and speeding up the job. The best part of all is that we do not need to write any additional code to take advantage of this! If a reduce function is both commutative and associative, then it can be used as a Combiner as well. You can enable combining in the word count program by adding the following line to the driver:

conf.setCombinerClass(Reduce.class);

The Combiner should be an instance of the Reducer interface. If your Reducer itself cannot be used directly as a Combiner because of commutativity or associativity, you might still be able to write a third class to use as a Combiner for your job.

FAULT TOLERANCE

One of the primary reasons to use Hadoop to run your jobs is due to its high degree of fault tolerance. Even when running jobs on a large cluster where individual nodes or network components may experience high rates of failure, Hadoop can guide jobs toward a successful completion.

The primary way that Hadoop achieves fault tolerance is through restarting tasks. Individual task nodes (TaskTrackers) are in constant communication with the head node of the system, called the JobTracker. If a TaskTracker fails to communicate with the JobTracker for a period of time (by default, 1 minute), the JobTracker will assume that the TaskTracker in question has crashed. The JobTracker knows which map and reduce tasks were assigned to each TaskTracker.

If the job is still in the mapping phase, then other TaskTrackers will be asked to re-execute all map tasks previously run by the failed TaskTracker. If the job is in the reducing phase, then other TaskTrackers will re-execute all reduce tasks that were in progress on the failed TaskTracker.

Reduce tasks, once completed, have been written back to HDFS. Thus, if a TaskTracker has already completed two out of three reduce tasks assigned to it, only the third task must be executed elsewhere. Map tasks are slightly more complicated: even if a node has completed ten map tasks, the reducers may not have all copied their inputs from the output of those map tasks. If a node has crashed, then its mapper outputs are inaccessible. So any already-completed map tasks must be re-executed to make their results available to the rest of the reducing machines. All of this is handled automatically by the Hadoop platform.

This fault tolerance underscores the need for program execution to be side-effect free. If Mappers and Reducers had individual identities and communicated with one another or the outside world, then restarting a task would require the other nodes to communicate with the new instances of the map and reduce tasks, and the re-executed tasks would need to reestablish their intermediate state. This process is notoriously complicated and error-prone in the general case. MapReduce simplifies this problem drastically by eliminating task identities or the ability for task partitions to communicate with one another. An individual task sees only its own direct inputs and knows only its own outputs, to make this failure and restart process clean and dependable.

Speculative execution: One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program. For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution andmapred.reduce.tasks.speculative.execution JobConf options to false, respectively.

Checkpoint

You now know about all of the basic operations of the Hadoop MapReduce platform. Try the following exercise, to see if you understand the MapReduce programming concepts.

Exercise: Given the code for WordCount in listings 2 and 3, modify this code to produce an inverted index of its inputs. An inverted index returns a list of documents that contain each word in those documents. Thus, if the word “cat” appears in documents A and B, but not C, then the line:

cat    A, B

should appear in the output. If the word “baseball” appears in documents B and C, then the line:

baseball    B, C

should appear in the output as well.

If you get stuck, read the section on troubleshooting below. The working solution is provided at the end of this module.

Hint: The default InputFormat will provide the Mapper with (key, value) pairs where the key is the byte offset into the file, and the value is a line of text. To get the filename of the current input, use the following code:

FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();

More Tips

CHAINING JOBS

Not every problem can be solved with a MapReduce program, but fewer still are those which can be solved with a single MapReduce job. Many problems can be solved with MapReduce, by writing several MapReduce steps which run in series to accomplish a goal:

Map1 -> Reduce1 -> Map2 -> Reduce2 -> Map3…

You can easily chain jobs together in this fashion by writing multiple driver methods, one for each job. Call the first driver method, which uses JobClient.runJob() to run the job and wait for it to complete. When that job has completed, then call the next driver method, which creates a new JobConf object referring to different instances of Mapper and Reducer, etc. The first job in the chain should write its output to a path which is then used as the input path for the second job. This process can be repeated for as many jobs are necessary to arrive at a complete solution to the problem.

Many problems which at first seem impossible in MapReduce can be accomplished by dividing one job into two or more.

Hadoop provides another mechanism for managing batches of jobs with dependencies between jobs. Rather than submit a JobConf to the JobClient‘s runJob() or submitJob() methods,org.apache.hadoop.mapred.jobcontrol.Job objects can be created to represent each job; A Jobtakes a JobConf object as its constructor argument. Jobs can depend on one another through the use of theaddDependingJob() method. The code:

  x.addDependingJob(y)

says that Job x cannot start until y has successfully completed. Dependency information cannot be added to a job after it has already been started. Given a set of jobs, these can be passed to an instance of theJobControl class. JobControl can receive individual jobs via the addJob() method, or a collection of jobs via addJobs(). The JobControl object will spawn a thread in the client to launch the jobs. Individual jobs will be launched when their dependencies have all successfully completed and when the MapReduce system as a whole has resources to execute the jobs. The JobControl interface allows you to query it to retrieve the state of individual jobs, as well as the list of jobs waiting, ready, running, and finished. The job submission process does not begin until the run() method of the JobControl object is called.

TROUBLESHOOTING: DEBUGGING MAPREDUCE

When writing MapReduce programs, you will occasionally encounter bugs in your programs, infinite loops, etc. This section describes the features of MapReduce that will help you diagnose and solve these conditions.

Log Files: Hadoop keeps logs of important events during program execution. By default, these are stored in the logs/ subdirectory of the hadoop-version/ directory where you run Hadoop from. Log files are named hadoop-username-service-hostname.log. The most recent data is in the .log file; older logs have their date appended to them. The username in the log filename refers to the username under which Hadoop was started — this is not necessarily the same username you are using to run programs. The service name refers to which of the several Hadoop programs are writing the log; these can be jobtracker, namenode, datanode, secondarynamenode, or tasktracker. All of these are important for debugging a whole Hadoop installation. But for individual programs, the tasktracker logs will be the most relevant. Any exceptions thrown by your program will be recorded in the tasktracker logs.

The log directory will also have a subdirectory called userlogs. Here there is another subdirectory for every task run. Each task records its stdout and stderr to two files in this directory. Note that on a multi-node Hadoop cluster, these logs are not centrally aggregated — you should check each TaskNode’slogs/userlogs/ directory for their output.

Debugging in the distributed setting is complicated and requires logging into several machines to access log data. If possible, programs should be unit tested by running Hadoop locally. The default configuration deployed by Hadoop runs in “single instance” mode, where the entire MapReduce program is run in the same instance of Java as called JobClient.runJob(). Using a debugger like Eclipse, you can then set breakpoints inside the map() or reduce() methods to discover your bugs.

LISTING AND KILLING JOBS:

It is possible to submit jobs to a Hadoop cluster which malfunction and send themselves into infinite loops or other problematic states. In this case, you will want to manually kill the job you have started.

The following command, run in the Hadoop installation directory on a Hadoop cluster, will list all the current jobs:

$ bin/hadoop job -list

This will produce output that looks something like:

1 jobs currently running
JobId   State   StartTime       UserName
job_200808111901_0001   1       1218506470390   aaron

You can use this job id to kill the job; the command is:

$ bin/hadoop job -kill jobid

Substitute the “job_2008...” from the -list command for jobid.

Additional Language Support

Hadoop itself is written in Java; it thus accepts Java code natively for Mappers and Reducers. Hadoop also comes with two adapter layers which allow code written in other languages to be used in MapReduce programs.

PIPES

Pipes is a library which allows C++ source code to be used for Mapper and Reducer code. Applications which require high numerical performance may see better throughput if written in C++ and used through Pipes. This library is supported on 32-bit Linux installations.

The include files and static libraries are present in the c++/Linux-i386-32/ directory under your Hadoop installation. Your application should include include/hadoop/Pipes.hh and TemplateFactory.hh and link against lib/libhadooppies.a; with gcc, include the arguments -L${HADOOP_HOME}/c++/Linux-i386-32/lib -lhadooppipes to do the latter.

Both key and value inputs to pipes programs are provided as STL strings (std::string). A program must still define an instance of Mapper and Reducer; these names have not changed. (They, like all other classes defined in Pipes, are in the HadoopPipes namespace.) Unlike the classes of the same names in Hadoop itself, the map() and reduce() functions take in a single argument which is a reference to an object of typeMapContext and ReduceContext respectively. The most important methods contained in each of these context objects are:

const std::string& getInputKey();
const std::string& getInputValue();
void emit(const std::string& key, const std::string& value);

The ReduceContext class also contains an additional method to advance the value iterator:

bool nextValue();

Defining a Pipes Program: A program to use with Pipes is defined by writing classes extending Mapper andReducer. Hadoop must then be informed which classes to use to run the job.

An instance of your C++ program will be started by the Pipes framework in main() on each machine. This should do any (hopefully brief) configuration required for your task. It should then define a Factory to create Mapper and Reducer instances as necessary, and then run the job by calling the runTask() method. The simplest way to define a factory is with the following code:

#include"TemplateFactory.hh"
using namespace HadoopPipes;

void main() {
  // classes are indicated to the factory via templates
  // TODO: Substitute your own class names in below.
  TemplateFactory2<MyMapperClass, MyReducerClass> factory();

  // do any configuration you need to do here

  // start the task
  bool result = runTask(factory);
}

Running a Pipes Program: After a Pipes program has been written and compiled, it can be launched as a job with the following command: (Do this in your Hadoop home directory)

$ bin/hadoop pipes -input inputPath -output outputPath -program path/to/pipes/program/executable

This will deploy your Pipes program on all nodes and run the MapReduce job through it. By runningbin/hadoop pipes with no options, you can see additional usage information which describes how to set additional configuration values as necessary.

The Pipes API contains additional functionality to allow you to read settings from the JobConf, override the Partitioner class, and use RecordReaders in a more direct fashion for higher performance. See the header files in c++/Linux-i386-32/include/hadoop for more information.

HADOOP STREAMING

Whereas Pipes is an API that provides close coupling between C++ application code and Hadoop, Streaming is a generic API that allows programs written in virtually any language to be used as Hadoop Mapper and Reducer implementations.

The official Hadoop documentation contains a thorough introduction to Streaming, and briefer notes on the wiki. A brief overview is presented here.

Hadoop Streaming allows you to use arbitrary programs for the Mapper and Reducer phases of a MapReduce job. Both Mappers and Reducers receive their input on stdin and emit output (key, value) pairs on stdout.

Input and output are always represented textually in Streaming. The input (key, value) pairs are written to stdin for a Mapper or Reducer, with a ‘tab’ character separating the key from the value. The Streaming programs should split the input on the first tab character on the line to recover the key and the value. Streaming programs write their output to stdout in the same format: key \t value \n.

The inputs to the reducer are sorted so that while each line contains only a single (key, value) pair, all the values for the same key are adjacent to one another.

Provided it can handle its input in the text format described above, any Linux program or tool can be used as the mapper or reducer in Streaming. You can also write your own scripts in bash, python, perl, or another language of your choice, provided that the necessary interpreter is present on all nodes in your cluster.

Running a Streaming Job: To run a job with Hadoop Streaming, use the following command:

$ bin/hadoop jar contrib/streaming/hadoop-version-streaming.jar

The command as shown, with no arguments, will print some usage information. An example of how to run real commands is given below:

$ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \
    myMapProgram -reducer myReduceProgram -input /some/dfs/path \
    -output /some/other/dfs/path

This assumes that myMapProgram and myReduceProgram are present on all nodes in the system ahead of time. If this is not the case, but they are present on the node launching the job, then they can be “shipped” to the other nodes with the -file option:

$ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \
    myMapProgram -reducer myReduceProgram -file \
    myMapProgram -file myReduceProgram -input some/dfs/path \
    -output some/other/dfs/path

Any other support files necessary to run your program can be shipped in this manner as well.

Conclusions

This module described the MapReduce execution platform at the heart of the Hadoop system. By using MapReduce, a high degree of parallelism can be achieved by applications. The MapReduce framework provides a high degree of fault tolerance for applications running on it by limiting the communication which can occur between nodes, and requiring applications to be written in a “dataflow-centric” manner.

Solution to Inverted Index Code

The following source code implements a solution to the inverted indexer problem posed at the checkpoint. The source code is structurally very similar to the source for Word Count; only a few lines really need to be modified.

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class LineIndexer {

  public static class LineIndexMapper extends MapReduceBase
      implements Mapper<LongWritable, Text, Text, Text> {

    private final static Text word = new Text();
    private final static Text location = new Text();

    public void map(LongWritable key, Text val,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

      FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
      String fileName = fileSplit.getPath().getName();
      location.set(fileName);

      String line = val.toString();
      StringTokenizer itr = new StringTokenizer(line.toLowerCase());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, location);
      }
    }
  }



  public static class LineIndexReducer extends MapReduceBase
      implements Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterator<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

      boolean first = true;
      StringBuilder toReturn = new StringBuilder();
      while (values.hasNext()){
        if (!first)
          toReturn.append(", ");
        first=false;
        toReturn.append(values.next().toString());
      }

      output.collect(key, new Text(toReturn.toString()));
    }
  }


  /**
   * The actual main() method for our program; this is the
   * "driver" for the MapReduce job.
   */
  public static void main(String[] args) {
    JobClient client = new JobClient();
    JobConf conf = new JobConf(LineIndexer.class);

    conf.setJobName("LineIndexer");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(conf, new Path("input"));
    FileOutputFormat.setOutputPath(conf, new Path("output"));

    conf.setMapperClass(LineIndexMapper.class);
    conf.setReducerClass(LineIndexReducer.class);

    client.setConf(conf);

    try {
      JobClient.runJob(conf);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}