Archive

Archive for the ‘hadoop cluster’ Category

Steps to Configure a Single-Node YARN Cluster

March 30, 2016 Leave a comment

The following type of installation is often referred to as “pseudo-distributed” because it mimics some of the functionality of a distributed Hadoop cluster. A single machine is, of course, not practical for any production use, nor is it parallel. A small-scale Hadoop installation can provide a simple method for learning Hadoop basics, however.

The recommended minimal installation hardware is a dual-core processor with 2 GB of RAM and 2 GB of available hard drive space. The system will need a recent Linux distribution with Java installed (e.g., Red Hat Enterprise Linux or rebuilds, Fedora, Suse Linux Enterprise, OpenSuse, Ubuntu). Red Hat Enterprise Linux 6.3 is used for this installation example. A bash shell environment is also assumed. The first step is to download Apache Hadoop.

Step 1: Download Apache Hadoop

Download the latest distribution from the Hadoop website ( http://hadoop.apache. org/). For example, as root do the following:

# cd /root
# wget http://mirrors.ibiblio.org/apache/hadoop/common/hadoop-2.2.0/hadoop- ➥2.2.0.tar.gz

Next create and extract the package in /opt/yarn:

# mkdir –p /opt/yarn

# cd /opt/yarn
# tar xvzf /root/hadoop-2.2.0.tar.gz

Step 2: Set JAVA_HOME

For Hadoop 2, the recommended version of Java can be found at http://wiki.apache. org/hadoop/HadoopJavaVersions. In general, a Java Development Kit 1.6 (or greater) should work. For this install, we will use Open Java 1.6.0_24, which is part of Red Hat Enterprise Linux 6.3. Make sure you have a working Java JDK installed; in this case, it is the Java-1.6.0-openjdk RPM. To include JAVA_HOME for all bash users (other shells must be set in a similar fashion), make an entry in /etc/profile.d as follows:

# echo “export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/” > /etc/ ➥profile.d/java.sh

To make sure JAVA_HOME is defined for this session, source the new script: # source /etc/profile.d/java.sh

Step 3: Create Users and Groups

It is best to run the various daemons with separate accounts. Three accounts (yarn, hdfs, mapred) in the group hadoop can be created as follows:

# groupadd hadoop

# useradd -g hadoop yarn

# useradd -g hadoop hdfs

# useradd -g hadoop mapred

Step 4: Make Data and Log Directories

Hadoop needs various data and log directories with various permissions. Enter the following lines to create these directories:

# mkdir -p /var/data/hadoop/hdfs/nn

# mkdir -p /var/data/hadoop/hdfs/snn

# mkdir -p /var/data/hadoop/hdfs/dn

# chown hdfs:hadoop /var/data/hadoop/hdfs –R

# mkdir -p /var/log/hadoop/yarn

# chown yarn:hadoop /var/log/hadoop/yarn -R

Next, move to the YARN installation root and create the log directory and set the owner and group as follows:

# cd /opt/yarn/hadoop-2.2.0

# mkdir logs
# chmod g+w logs

# chown yarn:hadoop . -R

Step 5: Configure core-site.xml

From the base of the Hadoop installation path (e.g., /opt/yarn/hadoop-2.2.0), edit the etc/hadoop/core-site.xml file. The original installed file will have no entries other than the<configuration></configuration>tags. Two properties need to be set. The first is the fs.default.name property, which sets the host and request port name for the NameNode (metadata server for HDFS). The second is hadoop.http.staticuser.user, which will set the default user name to hdfs. Copy the following lines to the Hadoop etc/hadoop/core-site.xml file and remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://localhost:9000</value>

</property>

<property>

<name>hadoop.http.staticuser.user</name> <value>hdfs</value>

</property>

</configuration>

Step 6: Configure hdfs-site.xml

From the base of the Hadoop installation path, edit the etc/hadoop/hdfs-site.xml file. In the single-node pseudo-distributed mode, we don’t need or want the HDFS to replicate file blocks. By default, HDFS keeps three copies of each file in the file system for redundancy. There is no need for replication on a single machine; thus the value of dfs.replication will be set to 1.

In hdfs-site.xml, we specify the NameNode, Secondary NameNode, and Data- Node data directories that we created in Step 4. These are the directories used by the various components of HDFS to store data. Copy the following lines into Hadoop etc/hadoop/hdfs-site.xml and remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>dfs.replication</name>

<value>1</value> </property> <property>

<name>dfs.namenode.name.dir</name>

<value>file:/var/data/hadoop/hdfs/nn</value> </property>

<property>

<name>fs.checkpoint.dir</name>

<value>file:/var/data/hadoop/hdfs/snn</value>

</property>

<property>

<name>fs.checkpoint.edits.dir</name> <value>file:/var/data/hadoop/hdfs/snn</value>

</property>

<property>

<name>dfs.datanode.data.dir</name>

<value>file:/var/data/hadoop/hdfs/dn</value>

</property>

</configuration>

 

Step 7: Configure mapred-site.xml

From the base of the Hadoop installation, edit the etc/hadoop/mapred-site.xml file. A new configuration option for Hadoop 2 is the capability to specify a framework name for MapReduce, setting the mapreduce.framework.name property. In this install, we will use the value of “yarn” to tell MapReduce that it will run as a YARN appli- cation. First, copy the template file to the mapred-site.xml.

# cp mapred-site.xml.template mapred-site.xml

Next, copy the following lines into Hadoop etc/hadoop/mapred-site.xml file and

remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

</configuration>

 

Step 8: Configure yarn-site.xml

From the base of the Hadoop installation, edit the etc/hadoop/yarn-site.xml file. The yarn.nodemanager.aux-services property tells NodeManagers that there will be an auxiliary service called mapreduce.shuffle that they need to implement. After we tell the NodeManagers to implement that service, we give it a class name as the means to implement that service. This particular configuration tells MapReduce how to do its shuffle. Because NodeManagers won’t shuffle data for a non-MapReduce job by default, we need to configure such a service for MapReduce. Copy the following lines to the Hadoop etc/hadoop/yarn-site.xml file and remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>

<property>

<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

<value>org.apache.hadoop.mapred.ShuffleHandler</value> </property>

</configuration>

Step 9: Modify Java Heap Sizes

The Hadoop installation uses several environment variables that determine the heap sizes for each Hadoop process. These are defined in the etc/hadoop/*-env.sh files used by Hadoop. The default for most of the processes is a 1 GB heap size; because we’re running on a workstation that will probably have limited resources compared to a standard server, however, we need to adjust the heap size settings. The values that follow are adequate for a small workstation or server.

Edit the etc/hadoop/hadoop-env.sh file to reflect the following (don’t forget to remove the “#” at the beginning of the line):

HADOOP_HEAPSIZE=”500″

HADOOP_NAMENODE_INIT_HEAPSIZE=”500″

Next, edit mapred-env.sh to ref lect the following:

HADOOP_JOB_HISTORYSERVER_HEAPSIZE=250

Finally, edit yarn-env.sh to ref lect the following:

JAVA_HEAP_MAX=-Xmx500m

The following line will need to be added to yarn-env.sh:

YARN_HEAPSIZE=500

Step 10: Format HDFS

For the HDFS NameNode to start, it needs to initialize the directory where it will hold its data. The NameNode service tracks all the metadata for the file sys- tem. The format process will use the value assigned to dfs.namenode.name.dir in etc/hadoop/hdfs-site.xml earlier (i.e., /var/data/hadoop/hdfs/nn). Format- ting destroys everything in the directory and sets up a new file system. Format the NameNode directory as the HDFS superuser, which is typically the “hdfs” user account.

From the base of the Hadoop distribution, change directories to the “bin” direc- tory and execute the following commands:

# su – hdfs

$ cd /opt/yarn/hadoop-2.2.0/bin

$ ./hdfs namenode -format

If the command worked, you should see the following near the end of a long list of messages:

INFO common.Storage: Storage directory /var/data/hadoop/hdfs/nn has been ➥successfully formatted.

 

Step 11: Start the HDFS Services

Once formatting is successful, the HDFS services must be started. There is one ser- vice for the NameNode (metadata server), a single DataNode (where the actual data
is stored), and the SecondaryNameNode (checkpoint data for the NameNode). The Hadoop distribution includes scripts that set up these commands as well as name other values such as PID directories, log directories, and other standard process configura- tions. From the bin directory in Step 10, execute the following as user hdfs:

$ cd ../sbin

$ ./hadoop-daemon.sh start namenode

The command should show the following:

starting namenode, logging to /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs-namenode- ➥limulus.out

The secondarynamenode and datanode services can be started in the same way:

$ ./hadoop-daemon.sh start secondarynamenode
starting secondarynamenode, logging to /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs- ➥secondarynamenode-limulus.out

$ ./hadoop-daemon.sh start datanode
starting datanode, logging to /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs-datanode- ➥limulus.out

If the daemon started successfully, you should see responses that will point to the log file. (Note that the actual log file is appended with “.log,” not “.out.”). As a sanity check, issue a jps command to confirm that all the services are running. The actual PID (Java Process ID) values will be different than shown in this listing:

$ jps

15140 SecondaryNameNode

15015 NameNode

15335 Jps

15214 DataNode

If the process did not start, it may be helpful to inspect the log files. For instance, examine the log file for the NameNode. (Note that the path is taken from the preced- ing command.)

vi /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs-namenode-limulus.log

All Hadoop services can be stopped using the hadoop-daemon.sh script. For example, to stop the datanode service, enter the following (as user hdfs in the /opt/yarn/hadoop-2.2.0/sbin directory):

$ ./hadoop-daemon.sh stop datanode

The same can be done for the NameNode and SecondaryNameNode.

Step 12: Start YARN Services

As with HDFS services, the YARN services need to be started. One ResourceManager and one NodeManager must be started as user yarn (exiting from user hdfs first):

$ exit

logout

# su – yarn

$ cd /opt/yarn/hadoop-2.2.0/sbin

$ ./yarn-daemon.sh start resourcemanager

starting resourcemanager, logging to /opt/yarn/hadoop-2.2.0/logs/yarn-yarn- ➥resourcemanager-limulus.out

$ ./yarn-daemon.sh start nodemanager
starting nodemanager, logging to /opt/yarn/hadoop-2.2.0/logs/yarn-yarn- ➥nodemanager-limulus.out

As when the HDFS daemons were started in Step 1, the status of the running dae- mons is sent to their respective log files. To check whether the services are running, issue a jps command. The following shows all the services necessary to run YARN on a single server:

$ jps

15933 Jps

15567 ResourceManager

15785 NodeManager

If there are missing services, check the log file for the specific service. Similar to the case with HDFS services, the services can be stopped by issuing a stop argument to the daemon script:

./yarn-daemon.sh stop nodemanager

Step 13: Verify the Running Services Using the Web Interface

Both HDFS and the YARN ResourceManager have a web interface. These interfaces are a convenient way to browse many of the aspects of your Hadoop installation. To monitor HDFS, enter the following (or use your favorite web browser):

$ firefox http://localhost:50070

Connecting to port 50070 will bring up a web interface similar to Figure 1.
1 .Web interface for the ResourceManager can be viewed by entering the following:

$ firefox http://localhost:8088

A webpage similar to that shown in Figure 1.2 will be displayed.

Ekran Resmi 2016-03-31 01.25.42

Figure 1.1 Webpage for HDFS file system

 

Ekran Resmi 2016-03-31 01.25.34

Figure 1.2 Webpage for YARN ResourceManager

 

Advertisements

Acquiring Big Data Using Apache Flume

December 29, 2015 Leave a comment
No technology is more synonymous with Big Data than Apache Hadoop. Hadoop’s distributed filesystem and compute framework make possible cost-effective, linearly scalable processing of petabytes of data. Unfortunately, there are few tutorials devoted to how to get big data into Hadoop in the first place.
Some data destined for Hadoop clusters surely comes from sporadic bulk loading processes, such as database and mainframe offloads and batched data dumps from legacy systems. But what has made data really big in recent years is that most new data is contained in high-throughput streams. Application logs, GPS tracking, social media updates, and digital sensors all constitute fast-moving streams begging for storage in the Hadoop Distributed File System (HDFS). As you might expect, several technologies have been developed to address the need for collection and transport of these high-throughput streams. Facebook’s Scribe and Apache/LinkedIn’s Kafka both offer solutions to the problem, but Apache Flume is rapidly becoming a de facto standard for directing data streams into Hadoop.

This article describes the basics of Apache Flume and illustrates how to quickly set up Flume agents for collecting fast-moving data streams and pushing the data into Hadoop’s filesystem. By the time we’re finished, you should be able to configure and launch a Flume agent and understand how multi-hop and fan-out flows are easily constructed from multiple agents.

Anatomy of a Flume Agent

Flume deploys as one or more agents, each contained within its own instance of the Java Virtual Machine (JVM). Agents consist of three pluggable components: sources, sinks, and channels. An agent must have at least one of each in order to run. Sources collect incoming data as events. Sinks write events out, and channels provide a queue to connect the source and sink. (Figure 1.)

Apache Flume Agent
Figure 1: Flume Agents consist of sources, channels, and sinks.

Sources

Put simply, Flume sources listen for and consume events. Events can range from newline-terminated strings in stdout to HTTP POSTs and RPC calls — it all depends on what sources the agent is configured to use. Flume agents may have more than one source, but must have at least one. Sources require a name and a type; the type then dictates additional configuration parameters.

On consuming an event, Flume sources write the event to a channel. Importantly, sources write to their channels as transactions. By dealing in events and transactions, Flume agents maintain end-to-end flow reliability. Events are not dropped inside a Flume agent unless the channel is explicitly allowed to discard them due to a full queue.

Channels

Channels are the mechanism by which Flume agents transfer events from their sources to their sinks. Events written to the channel by a source are not removed from the channel until a sink removes that event in a transaction. This allows Flume sinks to retry writes in the event of a failure in the external repository (such as HDFS or an outgoing network connection). For example, if the network between a Flume agent and a Hadoop cluster goes down, the channel will keep all events queued until the sink can correctly write to the cluster and close its transactions with the channel.

Channels are typically of two types: in-memory queues and durable disk-backed queues. In-memory channels provide high throughput but no recovery if an agent fails. File or database-backed channels, on the other hand, are durable. They support full recovery and event replay in the case of agent failure.

Sinks

Sinks provide Flume agents pluggable output capability — if you need to write to a new type storage, just write a Java class that implements the necessary classes. Like sources, sinks correspond to a type of output: writes to HDFS or HBase, remote procedure calls to other agents, or any number of other external repositories. Sinks remove events from the channel in transactions and write them to output. Transactions close when the event is successfully written, ensuring that all events are committed to their final destination.

Setting up a Simple Agent for HDFS

A simple one-source, one-sink Flume agent can be configured with just a single configuration file. In this example, I’ll create a text file named sample_agent.conf — it looks a lot like a Java properties file. At the top of the file, I configure the agent’s name and the names of its source, sink, and channel.

hdfs-agent.sources= netcat-collect
hdfs-agent.sinks = hdfs-write
hdfs-agent.channels= memory-channel

This defines an agent named hdfs-agent and the names of the sources, sinks, and channels; keep the name in mind, because we’ll need it to start the agent. Multiple sources, sinks, and channels can be defined on these lines as a whitespace-delimited list of names. In this case, the source is named netcat-collect, the sink hdfs-write, and the channel is named memory-channel. The names are indicative of what I’m setting up: events collected via netcat will be written to HDFS and I will use a memory-only queue for transactions.

Next, I configure the source. I use a netcat source, as it provides a simple means of interactively testing the agent. A netcat source requires a type as well as an address and port to which it should bind. The netcat source will listen on localhost on port 11111; messages to netcat will be consumed by the source as events.

 

hdfs-agent.sources.netcat-collect.type = netcat
hdfs-agent.sources.netcat-collect.bind = 127.0.0.1
hdfs-agent.sources.netcat-collect.port = 11111

With the source defined, I’ll configure the sink to write to HDFS. HDFS sinks support a number of options, but by default, the HDFS sink writes Hadoop SequenceFiles. In this example, I’ll specify the sink write raw textfiles to HDFS so they can be easily inspected; I’ll also set a roll interval, which forces Flume to commit writes to HDFS every 30 seconds. File rolls can be configured based on time, size, or a combination of the two. The file rolls are particularly important in environments for which HDFS does not support appending to files.

hdfs-agent.sinks.hdfs-write.type = hdfs
hdfs-agent.sinks.hdfs-write.hdfs.path = hdfs://namenode_address:8020/path/to/flume_test
hdfs-agent.sinks.hdfs-write.rollInterval = 30
hdfs-agent.sinks.hdfs-write.hdfs.writeFormat=Text
hdfs-agent.sinks.hdfs-write.hdfs.fileType=DataStream

Finally, I’ll configure a memory-backed channel to transfer events from source to sink and connect them together. Keep in mind that if I exceed the channel capacity, Flume will drop events. If I need durability in a file or JDBC, channel should be used instead.

 

hdfs-agent.channels.memoryChannel.type = memory
hdfs-agent.channels.memoryChannel.capacity=10000
hdfs-agent.sources.netcat-collect.channels=memoryChannel
hdfs-agent.sinks.hdfs-write.channel=memoryChannel

With the configuration complete, I start the Flume agent from a terminal:

flume-ng agent -f /path/to/sample_agent.conf -n hdfs-agent

In the Flume agent’s logs, I look for indication that the source, sink, and channel have successfully started. For example:

INFO nodemanager.DefaultLogicalNodeManager: Starting Channel memoryChannel
INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
INFO nodemanager.DefaultLogicalNodeManager: Starting Sink hdfs-write
INFO nodemanager.DefaultLogicalNodeManager: Starting Source netcat-collect
INFO source.NetcatSource: Source starting

In a separate terminal, connect to the agent via netcat and enter a series of messages.

> nc localhost 11111
> testing
> 1
> 2
> 3
In the agent logs, an HDFS file will be created and committed every 30 seconds. If I print the contents of the files to standard out using HDFS cat, I’ll find the messages from netcat are stored.

More Advanced Deployments

Regardless of source, direct writers to HDFS are too simple to be suitable for many deployments: Application servers may reside in the cloud while clusters are on-premise, many streams of data may need to be consolidated, or events may need to be filtered during transmission. Fortunately, Flume easily enables reliable multi-hop event transmission. Fan-in and fan-out patterns are readily supported via multiple sources and channel options. Additionally, Flume provides the notion of interceptors, which allow the decoration and filtering of events in flight.

Multi-Hop Topologies

Flume provides multi-hop deployments via Apache Avro-serialized RPC calls. For a given hop, the sending agent implements an Avro sink directed to a host and port where the receiving agent is listening. The receiver implements an Avro source bound to the designated host-port combination. Reliability is ensured by Flume’s transaction model. The sink on the sending agent does not close its transaction until receipt is acknowledged by the receiver. Similarly, the receiver does not acknowledge receipt until the incoming event has been committed to its channel.

#sender configuration
avro-agent.sinks= avro-sink
avro-agent.sinks.avro-sink.type=avro
avro-agent.sinks.avro-sink.host=remote.host.com
avro-agent.sinks.avro-sink.port=11111
#receiver configuration on remote.host.com
hdfs-agent.sources=avro-source
hdfs-agent.sources.avro-source.type=avro
hdfs-agent.sources.avro-source.bind=0.0.0.0
hdfs-agent.sources.avro-source.port=11111
hdfs-agent.sources.avro-source.channels=memoryChannel

Multihop event flow
Figure 2: Multihop event flows are constructed using RPCs between Avro sources and sinks.

Fan-In and Fan-Out

Fan-in is a common case for Flume agents. Agents may be run on many data collectors (such as application servers) in a large deployment, while only one or two writers to a remote Hadoop cluster are required to handle the total event throughput. In this case, the Flume topology is simple to configure. Each agent at a data collector implements the appropriate source and an Avro sink. All Avro sinks point to the host and port of the Flume agent charged with writing to the Hadoop cluster. The agent at the Hadoop cluster simply configures an Avro source on the designated host and port. Incoming events are consolidated automatically and are written to the configured sink.

Fan-out topologies are enabled via Flume’s source selectors. Selectors can be replicating — sending all events to multiple channels — or multiplexing. Multiplexed sources can be partitioned by mappings defined on events via interceptors. For example, a replicating selector may be appropriate when events need to be sent to HDFS and to flat log files or a database. Multiplexed selectors are useful when different mappings should be directed to different writers; for example, data destined for partitioned Hive tables may be best handled via multiplexing.

hdfs-agent.channels=mchannel1 mchannel2
hdfs-agent.sources.netcat-collect.selector.type = replicating
hdfs-agent.sources.r1.channels = mchannel1 mchannel2

Interceptors

Flume provides a robust system of interceptors for in-flight modification of events. Some interceptors serve to decorate data with metadata useful in multiplexing the data or processing it after it has been written to the sink. Common decorations include timestamps, hostnames, and static headers. It’s a great way to keep track of when your data arrived and from where it came.

More interestingly, interceptors can be used to selectively filter or decorate events. The Regex Filtering Interceptor allows events to be dropped if they match the provided regular expression. Similarly, the Regex Extractor Interceptor decorates event headers according to a regular expression. This is useful if incoming events require multiplexing, but static definitions are too inflexible.

hdfs-agent.sources.netcat-collect.interceptors = filt_int
hdfs-agent.sources.netcat-collect.interceptors.filt_int.type=regex_filter
hdfs-agent.sources.netcat-collect.interceptors.filt_int.regex=^echo.*
hdfs-agent.sources.netcat-collect.interceptors.filt_int.excludeEvents=true

Conclusion

There are lots of ways to acquire Big Data with which to fill up a Hadoop cluster, but many of those data sources arrive as fast-moving streams of data. Fortunately, the Hadoop ecosystem contains a component specifically designed for transporting and writing these streams: Apache Flume. Flume provides a robust, self-contained application which ensures reliable transportation of streaming data. Flume agents are easy to configure, requiring only a property file and an agent name. Moreover, Flume’s simple source-channel-sink design allows us to build complicated flows using only a set of Flume agents. So, while we don’t often address the process of acquiring Big Data for our Hadoop clusters, doing so is as easy and fun as taking a log ride.

How-to: Analyze Twitter Data with Apache Hadoop “http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/”

October 11, 2015 Leave a comment

Social media has gained immense popularity with marketing teams, and Twitter is an effective tool for a company to get people excited about its products. Twitter makes it easy to engage users and communicate directly with them, and in turn, users can provide word-of-mouth marketing for companies by discussing the products. Given limited resources, and knowing we may not be able to talk to everyone we want to target directly, marketing departments can be more efficient by being selective about whom we reach out to.

In this post, we’ll learn how we can use Apache FlumeApache HDFSApache Oozie, and Apache Hive to design an end-to-end data pipeline that will enable us to analyze Twitter data. This will be the first post in a series. The posts to follow to will describe, in more depth, how each component is involved and how the custom code operates. All the code and instructions necessary to reproduce this pipeline is available on the Cloudera Github.

Who is Influential?

To understand whom we should target, let’s take a step back and try to understand the mechanics of Twitter. A user – let’s call him Joe – follows a set of people, and has a set of followers. When Joe sends an update out, that update is seen by all of his followers. Joe can also retweet other users’ updates. A retweet is a repost of an update, much like you might forward an email. If Joe sees a tweet from Sue, and retweets it, all of Joe’s followers see Sue’s tweet, even if they don’t follow Sue. Through retweets, messages can get passed much further than just the followers of the person who sent the original tweet. Knowing that, we can try to engage users whose updates tend to generate lots of retweets. Since Twitter tracks retweet counts for all tweets, we can find the users we’re looking for by analyzing Twitter data.

Now we know the question we want to ask: Which Twitter users get the most retweets? Who is influential within our industry?

How Do We Answer These Questions?

SQL queries can be used to answer this question: We want to look at which users are responsible for the most retweets, in descending order of most retweeted. However, querying Twitter data in a traditional RDBMS is inconvenient, since the Twitter Streaming API outputs tweets in a JSON format which can be arbitrarily complex. In the Hadoop ecosystem, the Hive project provides a query interface which can be used to query data that resides in HDFS. The query language looks very similar to SQL, but allows us to easily model complex types, so we can easily query the type of data we have. Seems like a good place to start. So how do we get Twitter data into Hive? First, we need to get Twitter data into HDFS, and then we’ll be able to tell Hive where the data resides and how to read it.

The diagram above shows a high-level view of how some of the CDH (Cloudera’s Distribution Including Apache Hadoop) components can be pieced together to build the data pipeline we need to answer the questions we have. The rest of this post will describe how these components interact and the purposes they each serve.

Gathering Data with Apache Flume

The Twitter Streaming API will give us a constant stream of tweets coming from the service. One option would be to use a simple utility like curl to access the API and then periodically load the files. However, this would require us to write code to control where the data goes in HDFS, and if we have a secure cluster, we will have to integrate with security mechanisms. It will be much simpler to use components within CDH to automatically move the files from the API to HDFS, without our manual intervention.

Apache Flume is a data ingestion system that is configured by defining endpoints in a data flow called sources and sinks. In Flume, each individual piece of data (tweets, in our case) is called an event; sources produce events, and send the events through a channel, which connects the source to the sink. The sink then writes the events out to a predefined location. Flume supports some standard data sources, such as syslog or netcat. For this use case, we’ll need to design a custom source that accesses the Twitter Streaming API, and sends the tweets through a channel to a sink that writes to HDFS files. Additionally, we can use the custom source to filter the tweets on a set of search keywords to help identify relevant tweets, rather than a pure sample of the entire Twitter firehose. The custom Flume source code can be found here.

Partition Management with Oozie

Once we have the Twitter data loaded into HDFS, we can stage it for querying by creating an external table in Hive. Using an external table will allow us to query the table without moving the data from the location where it ends up in HDFS. To ensure scalability, as we add more and more data, we’ll need to also partition the table. A partitioned table allows us to prune the files that we read when querying, which results in better performance when dealing with large data sets. However, the Twitter API will continue to stream tweets and Flume will perpetually create new files. We can automate the periodic process of adding partitions to our table as the new data comes in.

Apache Oozie is a workflow coordination system that can be used to solve this problem. Oozie is an extremely flexible system for designing job workflows, which can be scheduled to run based on a set of criteria. We can configure the workflow to run an ALTER TABLE command that adds a partition containing the last hour’s worth of data into Hive, and we can instruct the workflow to occur every hour. This will ensure that we’re always looking at up-to-date data.

The configuration files for the Oozie workflow are located here.

Querying Complex Data with Hive

Before we can query the data, we need to ensure that the Hive table can properly interpret the JSON data. By default, Hive expects that input files use a delimited row format, but our Twitter data is in a JSON format, which will not work with the defaults. This is actually one of Hive’s biggest strengths. Hive allows us to flexibly define, and redefine, how the data is represented on disk. The schema is only really enforced when we read the data, and we can use the Hive SerDe interface to specify how to interpret what we’ve loaded.

SerDe stands for Serializer and Deserializer, which are interfaces that tell Hive how it should translate the data into something that Hive can process. In particular, the Deserializer interface is used when we read data off of disk, and converts the data into objects that Hive knows how to manipulate. We can write a custom SerDe that reads the JSON data in and translates the objects for Hive. Once that’s put into place, we can start querying. The JSON SerDe code can be found here. The SerDe will take a tweet in JSON form, like the following:

and translate the JSON entities into queryable columns:

which will result in:

We’ve now managed to put together an end-to-end system, which gathers data from the Twitter Streaming API, sends the tweets to files on HDFS through Flume, and uses Oozie to periodically load the files into Hive, where we can query the raw JSON data, through the use of a Hive SerDe.

Some Results

In my own testing, I let Flume collect data for about three days, filtering on a set of keywords:

hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

The collected data was about half a GB of JSON data, and here is an example of what a tweet looks like. The data has some structure, but certain fields may or may not exist. The retweeted_status field, for example, will only be present if the tweet was a retweet. Additionally, some of the fields may be arbitrarily complex. The hashtags field is an array of all the hashtags present in the tweets, but most RDBMS’s do not support arrays as a column type. This semi-structured quality of the data makes the data very difficult to query in a traditional RDBMS. Hive can handle this data much more gracefully.

The query below will find usernames, and the number of retweets they have generated across all the tweets that we have data for:

For the few days of data, I found that these were the most retweeted users for the industry:

From these results, we can see whose tweets are getting heard by the widest audience, and also determine whether these people are communicating on a regular basis or not. We can use this information to more carefully target our messaging in order to get them talking about our products, which, in turn, will get other people talking about our products.

Conclusion

In this post we’ve seen how we can take some of the components of CDH and combine them to create an end-to-end data management system. This same architecture could be used for a variety of applications designed to look at Twitter data, such as identifying spam accounts, or identifying clusters of keywords. Taking the system even further, the general architecture can be used across numerous applications. By plugging in different Flume sources and Hive SerDes, this application can be customized for many other applications, like analyzing web logs, to give an example. Grab the code, and give it a shot yourself.

CDH 5.3: Apache Sentry Integration with HDFS

October 11, 2015 Leave a comment

Starting in CDH 5.3, Apache Sentry integration with HDFS saves admins a lot of work by centralizing access control permissions across components that utilize HDFS.

It’s been more than a year and a half since a couple of my colleagues here at Cloudera shipped the first version of Sentry (now Apache Sentry (incubating)). This project filled a huge security gap in the Apache Hadoop ecosystem by bringing truly secure and dependable fine grained authorization to the Hadoop ecosystem and provided out-of-the-box integration for Apache Hive. Since then the project has grown significantly–adding support for Impala and Search and the wonderful Hue App to name a few significant additions.

In order to provide a truly secure and centralized authorization mechanism, Sentry deployments have been historically set up so that all Hive’s data and metadata are accessible only by HiveServer2 and every other user is cut out. This has been a pain point for Sqoop users as Sqoop does not use the HiveServer2 interface. Hence users with a Sentry-secured Hive deployment were forced to split the import task into two steps: simple HDFS import followed by manually loading the data into Hive.

With the inclusion of HDFS ACLs and the integration of Sentry into the Hive metastore in CDH 5.1, users were able to improve this situation and get the direct Hive import working again. However, this approach required manual administrator intervention to configure HDFS ACLs according to the Sentry configuration and needed a manual refresh to keep both systems in sync.

One of the large features included in the recently released CDH 5.3 is Sentry integration with HDFS, which enables customers to easily share data between Hive, Impala and all the other Hadoop components that interact with HDFS (MapReduce, Spark, Pig, and Sqoop, and so on) while ensuring that user access permissions only need to be set once, and that they are uniformly enforced.

The rest of this post focuses on the example of using Sqoop together with this Sentry feature. Sqoop data can now be imported into Hive without any additional administrator intervention. By exposing Sentry policies—what tables from which a user can select and to what tables they can insert—directly in HDFS, Sqoop will re-use the same policies that have been configured via GRANT/REVOKE statements or the Hue Sentry App and will import data into Hive without any trouble.

Configuration

In order for Sqoop to seamlessly import into a Sentry Secured Hive instance, the Hadoop administrator needs to follow a few configuration steps to enable all the necessary features. First, your cluster needs to be using the Sentry Service as backend for storing authorization metadata and not rely on the older policy files.

If you are already using Sentry Service and GRANT/REVOKE statements, you can directly jump to step 3).

  1. Make sure that you have Sentry service running on your cluster. You should see it in the service list:

Ekran Resmi 2015-10-12 01.58.03

    2.  And that Hive is configured to use this service as a backend for Sentry metadata:

Ekran Resmi 2015-10-12 01.58.14

     3.  Finally enable HDFS Integration with Sentry:

Ekran Resmi 2015-10-12 01.58.27

Example Sqoop Import

Let’s assume that we have user jarcec who needs to import data into a Hive database named default. User jarcec is part of a group that is also called jarcec – in real life the name of the group doesn’t have to be the same as the username and that is fine.

With an unsecured Hive installation, the Hadoop administrator would have to jump in and grant writing privilege to user jarcec for directory /user/hive/warehouse or one of its subdirectories. With Sentry and HDFS integration, the Hadoop administrator no longer needs to jump in. Instead Sqoop will reuse the same authorization policies that has been configured through Hive SQL or via the Sentry Hue Application. Let’s assume that user bc is jarcec‘s Manager and already has privileges to grant privileges in the default database.

    1. bc starts by invoking beeline and connecting to HiveServer2:

 

 

 

[bc@sqoopsentry-1 ~]$ beeline

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> !connect jdbc:hive2://sqoopsentry-1.vpc.cloudera.com:10000/default;principal=hive/sqoopsentry-1.vpc.cloudera.com@ENT.CLOUDERA.COM

    1. In case that user jarcec is not part of any role yet, we need to create a role for him:

 

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> CREATE ROLE jarcec_role;

 

No rows affected (0.769 seconds)

    1. And this new role jarcec_role needs to be granted to jarcec‘s group jarcec.

 

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> GRANT ROLE jarcec_role to GROUP jarcec;

 

No rows affected (0.651 seconds)

    1. And finally bc can grant access to database default (or any other) to the role jarcec_role;

 

 

1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> GRANT ALL ON DATABASE default TO ROLE jarcec_role;

 

No rows affected (0.16 seconds)

By executing the steps above, user jarcec has been given privilege to do any action (insert or select) with all objects inside database default. That includes the ability to create new tables, insert data or simply querying existing tables. With those privileges user jarcec can run the following Sqoop command as he was used to:

 

 

 

[jarcec@sqoopsentry-1 ~]$ sqoop import –connect jdbc:mysql://mysql.ent.cloudera.com/sqoop –username sqoop –password sqoop –table text <strong>–hive-import</strong>

14/12/14 15:37:38 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.3.0

 …

14/12/14 15:38:58 INFO mapreduce.ImportJobBase: Transferred 249.7567 MB in 75.8448 seconds (3.293 MB/sec) 

14/12/14 15:38:58 INFO mapreduce.ImportJobBase: Retrieved 1000000 records.

14/12/14 15:38:58 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `text` AS t LIMIT 1

14/12/14 15:38:58 INFO hive.HiveImport: Loading uploaded data into Hive

14/12/14 15:39:09 INFO hive.HiveImport: 14/12/14 15:39:09 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.

14/12/14 15:39:09 INFO hive.HiveImport:

14/12/14 15:39:09 INFO hive.HiveImport: Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.26/jars/hive-common-0.13.1-cdh5.3.0.jar!/hive-log4j.properties 

14/12/14 15:39:12 INFO hive.HiveImport: OK

14/12/14 15:39:12 INFO hive.HiveImport: Time taken: 1.079 seconds

14/12/14 15:39:12 INFO hive.HiveImport: Loading data to table default.text

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00000

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00001

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00002

14/12/14 15:39:13 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00003

14/12/14 15:39:13 INFO hive.HiveImport: Table default.text stats: [numFiles=4, numRows=0, totalSize=261888896, rawDataSize=0]

14/12/14 15:39:13 INFO hive.HiveImport: OK

14/12/14 15:39:13 INFO hive.HiveImport: Time taken: 0.719 seconds

14/12/14 15:39:13 INFO hive.HiveImport: <strong>Hive import complete</strong>.

14/12/14 15:39:13 INFO hive.HiveImport: Export directory is not empty, keeping it.

And jarcec can easily confirm in beeline that data have been indeed imported into Hive:

 

 

0: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> show tables from default;

+————+–+

|  tab_name  |

+————+–+

| text       |

+————+–+

1 row selected (0.177 seconds)

0: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> select count(*) from text;

+———-+–+

|   _c0    |

+———-+–+

| 1000000  |

+———-+–+

1 row selected (72.188 seconds)

If Hive is configured to inherit permissions, you might notice that Sqoop will print out several warnings similar to this one:

 

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00000

As there is no need to inherit HDFS permissions when Sentry is enabled in HDFS, you can safely ignore such messages.