Steps to Configure a Single-Node YARN Cluster

March 30, 2016 Leave a comment

The following type of installation is often referred to as “pseudo-distributed” because it mimics some of the functionality of a distributed Hadoop cluster. A single machine is, of course, not practical for any production use, nor is it parallel. A small-scale Hadoop installation can provide a simple method for learning Hadoop basics, however.

The recommended minimal installation hardware is a dual-core processor with 2 GB of RAM and 2 GB of available hard drive space. The system will need a recent Linux distribution with Java installed (e.g., Red Hat Enterprise Linux or rebuilds, Fedora, Suse Linux Enterprise, OpenSuse, Ubuntu). Red Hat Enterprise Linux 6.3 is used for this installation example. A bash shell environment is also assumed. The first step is to download Apache Hadoop.

Step 1: Download Apache Hadoop

Download the latest distribution from the Hadoop website ( http://hadoop.apache. org/). For example, as root do the following:

# cd /root
# wget http://mirrors.ibiblio.org/apache/hadoop/common/hadoop-2.2.0/hadoop- ➥2.2.0.tar.gz

Next create and extract the package in /opt/yarn:

# mkdir –p /opt/yarn

# cd /opt/yarn
# tar xvzf /root/hadoop-2.2.0.tar.gz

Step 2: Set JAVA_HOME

For Hadoop 2, the recommended version of Java can be found at http://wiki.apache. org/hadoop/HadoopJavaVersions. In general, a Java Development Kit 1.6 (or greater) should work. For this install, we will use Open Java 1.6.0_24, which is part of Red Hat Enterprise Linux 6.3. Make sure you have a working Java JDK installed; in this case, it is the Java-1.6.0-openjdk RPM. To include JAVA_HOME for all bash users (other shells must be set in a similar fashion), make an entry in /etc/profile.d as follows:

# echo “export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/” > /etc/ ➥profile.d/java.sh

To make sure JAVA_HOME is defined for this session, source the new script: # source /etc/profile.d/java.sh

Step 3: Create Users and Groups

It is best to run the various daemons with separate accounts. Three accounts (yarn, hdfs, mapred) in the group hadoop can be created as follows:

# groupadd hadoop

# useradd -g hadoop yarn

# useradd -g hadoop hdfs

# useradd -g hadoop mapred

Step 4: Make Data and Log Directories

Hadoop needs various data and log directories with various permissions. Enter the following lines to create these directories:

# mkdir -p /var/data/hadoop/hdfs/nn

# mkdir -p /var/data/hadoop/hdfs/snn

# mkdir -p /var/data/hadoop/hdfs/dn

# chown hdfs:hadoop /var/data/hadoop/hdfs –R

# mkdir -p /var/log/hadoop/yarn

# chown yarn:hadoop /var/log/hadoop/yarn -R

Next, move to the YARN installation root and create the log directory and set the owner and group as follows:

# cd /opt/yarn/hadoop-2.2.0

# mkdir logs
# chmod g+w logs

# chown yarn:hadoop . -R

Step 5: Configure core-site.xml

From the base of the Hadoop installation path (e.g., /opt/yarn/hadoop-2.2.0), edit the etc/hadoop/core-site.xml file. The original installed file will have no entries other than the<configuration></configuration>tags. Two properties need to be set. The first is the fs.default.name property, which sets the host and request port name for the NameNode (metadata server for HDFS). The second is hadoop.http.staticuser.user, which will set the default user name to hdfs. Copy the following lines to the Hadoop etc/hadoop/core-site.xml file and remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://localhost:9000</value>

</property>

<property>

<name>hadoop.http.staticuser.user</name> <value>hdfs</value>

</property>

</configuration>

Step 6: Configure hdfs-site.xml

From the base of the Hadoop installation path, edit the etc/hadoop/hdfs-site.xml file. In the single-node pseudo-distributed mode, we don’t need or want the HDFS to replicate file blocks. By default, HDFS keeps three copies of each file in the file system for redundancy. There is no need for replication on a single machine; thus the value of dfs.replication will be set to 1.

In hdfs-site.xml, we specify the NameNode, Secondary NameNode, and Data- Node data directories that we created in Step 4. These are the directories used by the various components of HDFS to store data. Copy the following lines into Hadoop etc/hadoop/hdfs-site.xml and remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>dfs.replication</name>

<value>1</value> </property> <property>

<name>dfs.namenode.name.dir</name>

<value>file:/var/data/hadoop/hdfs/nn</value> </property>

<property>

<name>fs.checkpoint.dir</name>

<value>file:/var/data/hadoop/hdfs/snn</value>

</property>

<property>

<name>fs.checkpoint.edits.dir</name> <value>file:/var/data/hadoop/hdfs/snn</value>

</property>

<property>

<name>dfs.datanode.data.dir</name>

<value>file:/var/data/hadoop/hdfs/dn</value>

</property>

</configuration>

 

Step 7: Configure mapred-site.xml

From the base of the Hadoop installation, edit the etc/hadoop/mapred-site.xml file. A new configuration option for Hadoop 2 is the capability to specify a framework name for MapReduce, setting the mapreduce.framework.name property. In this install, we will use the value of “yarn” to tell MapReduce that it will run as a YARN appli- cation. First, copy the template file to the mapred-site.xml.

# cp mapred-site.xml.template mapred-site.xml

Next, copy the following lines into Hadoop etc/hadoop/mapred-site.xml file and

remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

</configuration>

 

Step 8: Configure yarn-site.xml

From the base of the Hadoop installation, edit the etc/hadoop/yarn-site.xml file. The yarn.nodemanager.aux-services property tells NodeManagers that there will be an auxiliary service called mapreduce.shuffle that they need to implement. After we tell the NodeManagers to implement that service, we give it a class name as the means to implement that service. This particular configuration tells MapReduce how to do its shuffle. Because NodeManagers won’t shuffle data for a non-MapReduce job by default, we need to configure such a service for MapReduce. Copy the following lines to the Hadoop etc/hadoop/yarn-site.xml file and remove the original empty <configuration> </configuration> tags.

<configuration>

<property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>

<property>

<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

<value>org.apache.hadoop.mapred.ShuffleHandler</value> </property>

</configuration>

Step 9: Modify Java Heap Sizes

The Hadoop installation uses several environment variables that determine the heap sizes for each Hadoop process. These are defined in the etc/hadoop/*-env.sh files used by Hadoop. The default for most of the processes is a 1 GB heap size; because we’re running on a workstation that will probably have limited resources compared to a standard server, however, we need to adjust the heap size settings. The values that follow are adequate for a small workstation or server.

Edit the etc/hadoop/hadoop-env.sh file to reflect the following (don’t forget to remove the “#” at the beginning of the line):

HADOOP_HEAPSIZE=”500″

HADOOP_NAMENODE_INIT_HEAPSIZE=”500″

Next, edit mapred-env.sh to ref lect the following:

HADOOP_JOB_HISTORYSERVER_HEAPSIZE=250

Finally, edit yarn-env.sh to ref lect the following:

JAVA_HEAP_MAX=-Xmx500m

The following line will need to be added to yarn-env.sh:

YARN_HEAPSIZE=500

Step 10: Format HDFS

For the HDFS NameNode to start, it needs to initialize the directory where it will hold its data. The NameNode service tracks all the metadata for the file sys- tem. The format process will use the value assigned to dfs.namenode.name.dir in etc/hadoop/hdfs-site.xml earlier (i.e., /var/data/hadoop/hdfs/nn). Format- ting destroys everything in the directory and sets up a new file system. Format the NameNode directory as the HDFS superuser, which is typically the “hdfs” user account.

From the base of the Hadoop distribution, change directories to the “bin” direc- tory and execute the following commands:

# su – hdfs

$ cd /opt/yarn/hadoop-2.2.0/bin

$ ./hdfs namenode -format

If the command worked, you should see the following near the end of a long list of messages:

INFO common.Storage: Storage directory /var/data/hadoop/hdfs/nn has been ➥successfully formatted.

 

Step 11: Start the HDFS Services

Once formatting is successful, the HDFS services must be started. There is one ser- vice for the NameNode (metadata server), a single DataNode (where the actual data
is stored), and the SecondaryNameNode (checkpoint data for the NameNode). The Hadoop distribution includes scripts that set up these commands as well as name other values such as PID directories, log directories, and other standard process configura- tions. From the bin directory in Step 10, execute the following as user hdfs:

$ cd ../sbin

$ ./hadoop-daemon.sh start namenode

The command should show the following:

starting namenode, logging to /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs-namenode- ➥limulus.out

The secondarynamenode and datanode services can be started in the same way:

$ ./hadoop-daemon.sh start secondarynamenode
starting secondarynamenode, logging to /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs- ➥secondarynamenode-limulus.out

$ ./hadoop-daemon.sh start datanode
starting datanode, logging to /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs-datanode- ➥limulus.out

If the daemon started successfully, you should see responses that will point to the log file. (Note that the actual log file is appended with “.log,” not “.out.”). As a sanity check, issue a jps command to confirm that all the services are running. The actual PID (Java Process ID) values will be different than shown in this listing:

$ jps

15140 SecondaryNameNode

15015 NameNode

15335 Jps

15214 DataNode

If the process did not start, it may be helpful to inspect the log files. For instance, examine the log file for the NameNode. (Note that the path is taken from the preced- ing command.)

vi /opt/yarn/hadoop-2.2.0/logs/hadoop-hdfs-namenode-limulus.log

All Hadoop services can be stopped using the hadoop-daemon.sh script. For example, to stop the datanode service, enter the following (as user hdfs in the /opt/yarn/hadoop-2.2.0/sbin directory):

$ ./hadoop-daemon.sh stop datanode

The same can be done for the NameNode and SecondaryNameNode.

Step 12: Start YARN Services

As with HDFS services, the YARN services need to be started. One ResourceManager and one NodeManager must be started as user yarn (exiting from user hdfs first):

$ exit

logout

# su – yarn

$ cd /opt/yarn/hadoop-2.2.0/sbin

$ ./yarn-daemon.sh start resourcemanager

starting resourcemanager, logging to /opt/yarn/hadoop-2.2.0/logs/yarn-yarn- ➥resourcemanager-limulus.out

$ ./yarn-daemon.sh start nodemanager
starting nodemanager, logging to /opt/yarn/hadoop-2.2.0/logs/yarn-yarn- ➥nodemanager-limulus.out

As when the HDFS daemons were started in Step 1, the status of the running dae- mons is sent to their respective log files. To check whether the services are running, issue a jps command. The following shows all the services necessary to run YARN on a single server:

$ jps

15933 Jps

15567 ResourceManager

15785 NodeManager

If there are missing services, check the log file for the specific service. Similar to the case with HDFS services, the services can be stopped by issuing a stop argument to the daemon script:

./yarn-daemon.sh stop nodemanager

Step 13: Verify the Running Services Using the Web Interface

Both HDFS and the YARN ResourceManager have a web interface. These interfaces are a convenient way to browse many of the aspects of your Hadoop installation. To monitor HDFS, enter the following (or use your favorite web browser):

$ firefox http://localhost:50070

Connecting to port 50070 will bring up a web interface similar to Figure 1.
1 .Web interface for the ResourceManager can be viewed by entering the following:

$ firefox http://localhost:8088

A webpage similar to that shown in Figure 1.2 will be displayed.

Ekran Resmi 2016-03-31 01.25.42

Figure 1.1 Webpage for HDFS file system

 

Ekran Resmi 2016-03-31 01.25.34

Figure 1.2 Webpage for YARN ResourceManager

 

How-to: Quickly Configure Kerberos for Your Apache Hadoop Cluster “http://blog.cloudera.com/blog/2015/03/how-to-quickly-configure-kerberos-for-your-apache-hadoop-cluster/”

March 11, 2016 Leave a comment

Use the scripts and screenshots below to configure a Kerberized cluster in minutes.

Kerberos is the foundation of securing your Apache Hadoop cluster. With Kerberos enabled, user authentication is required. Once users are authenticated, you can use projects like Apache Sentry (incubating) for role-based access control via GRANT/REVOKE statements.

Taming the three-headed dog that guards the gates of Hades is challenging, so Cloudera has put significant effort into making this process easier in Hadoop-based enterprise data hubs. In this post, you’ll learn how to stand-up a one-node cluster with Kerberos enforcing user authentication, using the Cloudera QuickStart VM as a demo environment.

If you want to read the product documentation, it’s available here. You should consider this reference material; I’d suggest reading it later to understand more details about what the scripts do.

Requirements

You need the following downloads to follow along.

Initial Configuration

Before you start the QuickStart VM, increase the memory allocation to 8GB RAM and increase the number of CPUs to two. You can get by with a little less RAM, but we will have everything including the Kerberos server running on one node.

Start up the VM and activate Cloudera Manager as shown here:

Give this script some time to run, it has to restart the cluster.

KDC Install and Setup Script

The script goKerberos_beforeCM.sh does all the setup work for the Kerberos server and the appropriate configuration parameters. The comments are designed to explain what is going on inline. (Do not copy and paste this script! It contains unprintable characters that are pretending to be spaces. Rather, download it.)

Cloudera Manager Kerberos Wizard

After running the script, you now have a working Kerberos server and can secure the Hadoop cluster. The wizard will do most of the heavy lifting; you just have to fill in a few values.

To start, log into Cloudera Manager by going to http://quickstart.cloudera:7180 in your browser. The userid is cloudera and the password is cloudera. (Almost needless to say but never use “cloudera” as a password in a real-world setting.)

There are lots of productivity tools here for managing the cluster but ignore them for now and head straight for the Administration > Kerberos wizard as shown in the next screenshot.

Click on the “Enable Kerberos” button.

The four checklist items were all completed by the script you’ve already run. Check off each item and select “Continue.”

The Kerberos Wizard needs to know the details of what the script configured. Fill in the entries as follows:

  • KDC Server Host: quickstart.cloudera
  • Kerberos Security Realm: CLOUDERA
  • Kerberos Encryption Types: aes256-cts-hmac-sha1-96

Click “Continue.”

Do you want Cloudera Manager to manage the krb5.conf files in your cluster? Remember, the whole point of this blog post is to make Kerberos easier. So, please check “Yes” and then select “Continue.”

The Kerberos Wizard is going to create Kerberos principals for the different services in the cluster. To do that it needs a Kerberos Administrator ID. The ID created is: cloudera-scm/admin@CLOUDERA.

The screen shot shows how to enter this information. Recall the password is: cloudera.

The next screen provides good news. It lets you know that the wizard was able to successfully authenticate.

OK, you’re ready to let the Kerberos Wizard do its work. Since this is a VM, you can safely select “I’m ready to restart the cluster now” and then click “Continue.” You now have time to go get a coffee or other beverage of your choice.

How long does that take? Just let it work.

Congrats, you are now running a Hadoop cluster secured with Kerberos.

Kerberos is Enabled. Now What?

The old method of su - hdfs will no longer provide administrator access to the HDFS filesystem. Here is how you become the hdfs user with Kerberos:

Now validate you can do hdfs user things:

Next, invalidate the Kerberos token so as not to break anything:

The min.user parameter needs to be fixed per the message below:

This is the error message you get without fixing min.user.id:

Save the changes shown above and restart the YARN service. Now validate that the cloudera user can use the cluster:

If you forget to kinit before trying to use the cluster you’ll get the errors below. The simple fix is to use kinit with the principal you wish to use.

Spark Streaming with HBase

January 25, 2016 Leave a comment

What is Spark Streaming?

First of all, what is streaming? A data stream is an unbounded sequence of data arriving continuously. Streaming divides continuously flowing input data into discrete units for processing. Stream processing is low latency processing and analyzing of streaming data. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data. Spark Streaming is for use cases which require a significant amount of data to be quickly processed as soon as it arrives. Example real-time use cases are:

  • Website monitoring , Network monitoring
  • Fraud detection
  • Web clicks
  • Advertising
  • Internet of Things: sensors

Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark’s core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.

How Spark Streaming Works

Streaming data is continuous and needs to be batched to process. Spark Streaming divides the data stream into batches of X seconds called Dstreams, which internally is a sequence of RDDs. Your Spark Application processes the RDDs using Spark APIs, and the processed results of the RDD operations are returned in batches.

Architecture of the example Streaming Application

The Spark Streaming example code does the following:

  • Reads streaming data.
  • Processes the streaming data.
  • Writes the processed data to an HBase Table.

Other Spark example code does the following:

  • Reads HBase Table data written by the streaming code
  • Calculates daily summary statistics
  • Writes summary statistics to the HBase table Column Family stats

Example data set

The Oil Pump Sensor data comes in as comma separated value (csv) files dropped in a directory. Spark Streaming will monitor the directory and process any files created in that directory. (As stated before, Spark Streaming supports different streaming data sources; for simplicity, this example will use files.) Below is an example of the csv file with some sample data:

We use a Scala case class to define the Sensor schema corresponding to the sensor data csv files, and a parseSensor function to parse the comma separated values into the sensor case class.

// schema for sensor data
case class Sensor(resid: String, date: String, time: String, hz: Double, disp: Double, flo: Double, 
          sedPPM: Double, psi: Double, chlPPM: Double)

object Sensor {
   // function to parse line of csv data into Sensor class
   def parseSensor(str: String): Sensor = {
       val p = str.split(",")
        Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble,
            p(7).toDouble, p(8).toDouble)
  }
…
}

HBase Table schema

The HBase Table Schema for the streaming data is as follows:

  • Composite row key of the pump name date and time stamp
  • Column Family data with columns corresponding to the input data fields Column Family alerts with columns corresponding to any filters for alarming values Note that the data and alert column families could be set to expire values after a certain amount of time.

The Schema for the daily statistics summary rollups is as follows:

  • Composite row key of the pump name and date
  • Column Family stats
  • Columns for min, max, avg.

The function below converts a Sensor object into an HBase Put object, which is used to insert a row into HBase.

val cfDataBytes = Bytes.toBytes("data")

object Sensor {
. . .
  //  Convert a row of sensor object data to an HBase put object
  def convertToPut(sensor: Sensor): (ImmutableBytesWritable, Put) = {
      val dateTime = sensor.date + " " + sensor.time
      // create a composite row key: sensorid_date time
      val rowkey = sensor.resid + "_" + dateTime
      val put = new Put(Bytes.toBytes(rowkey))
      // add to column family data, column  data values to put object 
      put.add(cfDataBytes, Bytes.toBytes("hz"), Bytes.toBytes(sensor.hz))
      put.add(cfDataBytes, Bytes.toBytes("disp"), Bytes.toBytes(sensor.disp))
      put.add(cfDataBytes, Bytes.toBytes("flo"), Bytes.toBytes(sensor.flo))
      put.add(cfDataBytes, Bytes.toBytes("sedPPM"), Bytes.toBytes(sensor.sedPPM))
      put.add(cfDataBytes, Bytes.toBytes("psi"), Bytes.toBytes(sensor.psi))
      put.add(cfDataBytes, Bytes.toBytes("chlPPM"), Bytes.toBytes(sensor.chlPPM))
      return (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)
  }
}

Configuration for Writing to an HBase Table

You can use the TableOutputFormat class with Spark to write to an HBase table, similar to how you would write to an HBase table from MapReduce. Below we set up the configuration for writing to HBase using the TableOutputFormat class.

   val tableName = "sensor"
     
   // set up Hadoop HBase configuration using TableOutputFormat
    val conf = HBaseConfiguration.create()
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    val jobConfig: jobConfig = new JobConf(conf, this.getClass)
    jobConfig.setOutputFormat(classOf[TableOutputFormat])
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)

The Spark Streaming Example Code

These are the basic steps for Spark Streaming code:

  1. Initialize a Spark StreamingContext object.
  2. Apply transformations and output operations to DStreams.
  3. Start receiving data and processing it using streamingContext.start().
  4. Wait for the processing to be stopped using streamingContext.awaitTermination().

We will go through each of these steps with the example application code.

Initializing the StreamingContext

First we create a StreamingContext, the main entry point for streaming functionality, with a 2 second batch interval. (In the code boxes, comments are in Green)

val sparkConf = new SparkConf().setAppName("HBaseStream")

//  create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

Next, we use the StreamingContext textFileStream(directory) method to create an input stream that monitors a Hadoop-compatible file system for new files and processes any files created in that directory.

// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

The linesDStream represents the stream of data, each record is a line of text. Internally a DStream is a sequence of RDDs, one RDD per batch interval.

Apply transformations and output operations to DStreams

Next we parse the lines of data into Sensor objects, with the map operation on the linesDStream.

// parse each line of data in linesDStream  into sensor objects

val sensorDStream = linesDStream.map(Sensor.parseSensor) 

The map operation applies the Sensor.parseSensor function on the RDDs in the linesDStream, resulting in RDDs of Sensor objects.

Next we use the DStream foreachRDD method to apply processing to each RDD in this DStream. We filter the sensor objects for low psi to create alerts, then we write the sensor and alert data to HBase by converting them to Put objects, and using the PairRDDFunctions saveAsHadoopDatasetmethod, which outputs the RDD to any Hadoop-supported storage system using a Hadoop Configuration object for that storage system (see Hadoop Configuration for HBase above).

// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
        // filter sensor data for low psi
     val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)

      // convert sensor data to put object and write to HBase  Table CF data
      rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)

     // convert alert to put object write to HBase  Table CF alerts
     rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}

The sensorRDD objects are converted to put objects then written to HBase.

Start receiving data

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

Spark Reading from and Writing to HBase

Now we want to read the HBase sensor table data , calculate daily summary statistics and write these statistics to the stats column family.

The code below reads the HBase table sensor table psi column data, calculates statistics on this data using StatCounter, then writes the statistics to the sensor stats column family.

     // configure HBase for reading 
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
    // scan data column family psi column
    conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") 

// Load an RDD of (row key, row Result) tuples from the table
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    // transform (row key, row Result) tuples into an RDD of Results
    val resultRDD = hBaseRDD.map(tuple => tuple._2)

    // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
    val keyValueRDD = resultRDD.
              map(result => (Bytes.toString(result.getRow()).
              split(" ")(0), Bytes.toDouble(result.value)))

    // group by rowkey , get statistics for column value
    val keyStatsRDD = keyValueRDD.
             groupByKey().
             mapValues(list => StatCounter(list))

    // convert rowkey, stats to put and write to hbase table stats column family
    keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

The diagram below shows that the output from newAPIHadoopRDD is an RDD of row key, result pairs. The PairRDDFunctions saveAsHadoopDataset saves the Put objects to HBase.

Software

Running the Application

You can run the code as a standalone application as described in the tutorial on Getting Started with Spark on MapR Sandbox.

Here are the steps summarized:

  1. Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr.
  2. Build the application using maven.
  3. Copy the jar file and data file to your sandbox home directory /user/user01 using scp.
  4. Run the streaming app:
     /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar
    
  5. Copy the streaming data file to the stream directory:
    cp sensordata.csv /user/user01/stream/
  6. Read data and calculate stats for one column
       /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
        --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
    
  7. Calculate stats for whole row
      /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar
Categories: #oracle_Emp, big data

An In-Depth Look at the HBase Architecture

January 25, 2016 Leave a comment

HBase Architectural Components

Physically, HBase is composed of three types of servers in a master slave type of architecture. Region servers serve data for reads and writes. When accessing data, clients communicate with HBase RegionServers directly. Region assignment, DDL (create, delete tables) operations are handled by the HBase Master process. Zookeeper, which is part of HDFS, maintains a live cluster state.

The Hadoop DataNode stores the data that the Region Server is managing. All HBase data is stored in HDFS files. Region Servers are collocated with the HDFS DataNodes, which enable data locality (putting the data close to where it is needed) for the data served by the RegionServers. HBase data is local when it is written, but when a region is moved, it is not local until compaction.

The NameNode maintains metadata information for all the physical data blocks that comprise the files.

Regions

HBase Tables are divided horizontally by row key range into “Regions.” A region contains all rows in the table between the region’s start key and end key. Regions are assigned to the nodes in the cluster, called “Region Servers,” and these serve data for reads and writes. A region server can serve about 1,000 regions.

HBase HMaster

Region assignment, DDL (create, delete tables) operations are handled by the HBase Master.

A master is responsible for:

  • Coordinating the region servers- Assigning regions on startup , re-assigning regions for recovery or load balancing- Monitoring all RegionServer instances in the cluster (listens for notifications from zookeeper)
  • Admin functions- Interface for creating, deleting, updating tables

ZooKeeper: The Coordinator

HBase uses ZooKeeper as a distributed coordination service to maintain server state in the cluster. Zookeeper maintains which servers are alive and available, and provides server failure notification. Zookeeper uses consensus to guarantee common shared state. Note that there should be three or five machines for consensus.

How the Components Work Together

Zookeeper is used to coordinate shared state information for members of distributed systems. Region servers and the active HMaster connect with a session to ZooKeeper. The ZooKeeper maintains ephemeral nodes for active sessions via heartbeats.

Each Region Server creates an ephemeral node. The HMaster monitors these nodes to discover available region servers, and it also monitors these nodes for server failures. HMasters vie to create an ephemeral node. Zookeeper determines the first one and uses it to make sure that only one master is active. The active HMaster sends heartbeats to Zookeeper, and the inactive HMaster listens for notifications of the active HMaster failure.

If a region server or the active HMaster fails to send a heartbeat, the session is expired and the corresponding ephemeral node is deleted. Listeners for updates will be notified of the deleted nodes. The active HMaster listens for region servers, and will recover region servers on failure. The Inactive HMaster listens for active HMaster failure, and if an active HMaster fails, the inactive HMaster becomes active.

HBase First Read or Write

There is a special HBase Catalog table called the META table, which holds the location of the regions in the cluster. ZooKeeper stores the location of the META table.

This is what happens the first time a client reads or writes to HBase:

  1. The client gets the Region server that hosts the META table from ZooKeeper.
  2. The client will query the .META. server to get the region server corresponding to the row key it wants to access. The client caches this information along with the META table location.
  3. It will get the Row from the corresponding Region Server.

For future reads, the client uses the cache to retrieve the META location and previously read row keys. Over time, it does not need to query the META table, unless there is a miss because a region has moved; then it will re-query and update the cache.

HBase Meta Table

  • This META table is an HBase table that keeps a list of all regions in the system.
  • The .META. table is like a b tree.
  • The .META. table structure is as follows:- Key: region start key,region id- Values: RegionServer

Region Server Components

A Region Server runs on an HDFS data node and has the following components:

  • WAL: Write Ahead Log is a file on the distributed file system. The WAL is used to store new data that hasn’t yet been persisted to permanent storage; it is used for recovery in the case of failure.
  • BlockCache: is the read cache. It stores frequently read data in memory. Least Recently Used data is evicted when full.
  • MemStore: is the write cache. It stores new data which has not yet been written to disk. It is sorted before writing to disk. There is one MemStore per column family per region.
  • Hfiles store the rows as sorted KeyValues on disk.

HBase Write Steps (1)

When the client issues a Put request, the first step is to write the data to the write-ahead log, the WAL:

– Edits are appended to the end of the WAL file that is stored on disk. – The WAL is used to recover not-yet-persisted data in case a server crashes.

HBase Write Steps (2)

Once the data is written to the WAL, it is placed in the MemStore. Then, the put request acknowledgement returns to the client.

HBase MemStore

The MemStore stores updates in memory as sorted KeyValues, the same as it would be stored in an HFile. There is one MemStore per column family. The updates are sorted per column family.

HBase Region Flush

When the MemStore accumulates enough data, the entire sorted set is written to a new HFile in HDFS. HBase uses multiple HFiles per column family, which contain the actual cells, or KeyValue instances. These files are created over time as KeyValue edits sorted in the MemStores are flushed as files to disk.

Note that this is one reason why there is a limit to the number of column families in HBase. There is one MemStore per CF; when one is full, they all flush. It also saves the last written sequence number so the system knows what was persisted so far.

The highest sequence number is stored as a meta field in each HFile, to reflect where persisting has ended and where to continue. On region startup, the sequence number is read, and the highest is used as the sequence number for new edits.

HBase HFile

Data is stored in an HFile which contains sorted key/values. When the MemStore accumulates enough data, the entire sorted KeyValue set is written to a new HFile in HDFS. This is a sequential write. It is very fast, as it avoids moving the disk drive head.

HBase HFile Structure

An HFile contains a multi-layered index which allows HBase to seek to the data without having to read the whole file. The multi-level index is like a b+tree:

  • Key value pairs are stored in increasing order
  • Indexes point by row key to the key value data in 64KB “blocks”
  • Each block has its own leaf-index
  • The last key of each block is put in the intermediate index
  • The root index points to the intermediate index

The trailer points to the meta blocks, and is written at the end of persisting the data to the file. The trailer also has information like bloom filters and time range info. Bloom filters help to skip files that do not contain a certain row key. The time range info is useful for skipping the file if it is not in the time range the read is looking for.

HFile Index

The index, which we just discussed, is loaded when the HFile is opened and kept in memory. This allows lookups to be performed with a single disk seek.

HBase Read Merge

We have seen that the KeyValue cells corresponding to one row can be in multiple places, row cells already persisted are in Hfiles, recently updated cells are in the MemStore, and recently read cells are in the Block cache. So when you read a row, how does the system get the corresponding cells to return? A Read merges Key Values from the block cache, MemStore, and HFiles in the following steps:

  1. First, the scanner looks for the Row cells in the Block cache – the read cache. Recently Read Key Values are cached here, and Least Recently Used are evicted when memory is needed.
  2. Next, the scanner looks in the MemStore, the write cache in memory containing the most recent writes.
  3. If the scanner does not find all of the row cells in the MemStore and Block Cache, then HBase will use the Block Cache indexes and bloom filters to load HFiles into memory, which may contain the target row cells.

HBase Read Merge

As discussed earlier, there may be many HFiles per MemStore, which means for a read, multiple files may have to be examined, which can affect the performance. This is called read amplification.

HBase Minor Compaction

HBase will automatically pick some smaller HFiles and rewrite them into fewer bigger Hfiles. This process is called minor compaction. Minor compaction reduces the number of storage files by rewriting smaller files into fewer but larger ones, performing a merge sort.

HBase Major Compaction

Major compaction merges and rewrites all the HFiles in a region to one HFile per column family, and in the process, drops deleted or expired cells. This improves read performance; however, since major compaction rewrites all of the files, lots of disk I/O and network traffic might occur during the process. This is called write amplification.

Major compactions can be scheduled to run automatically. Due to write amplification, major compactions are usually scheduled for weekends or evenings. Note that MapR-DB has made improvements and does not need to do compactions. A major compaction also makes any data files that were remote, due to server failure or load balancing, local to the region server.

Region = Contiguous Keys

Let’s do a quick review of regions:

  • A table can be divided horizontally into one or more regions. A region contains a contiguous, sorted range of rows between a start key and an end key
  • Each region is 1GB in size (default)
  • A region of a table is served to the client by a RegionServer
  • A region server can serve about 1,000 regions (which may belong to the same table or different tables)

Region Split

Initially there is one region per table. When a region grows too large, it splits into two child regions. Both child regions, representing one-half of the original region, are opened in parallel on the same Region server, and then the split is reported to the HMaster. For load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers.

Read Load Balancing

Splitting happens initially on the same region server, but for load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers. This results in the new Region server serving data from a remote HDFS node until a major compaction moves the data files to the Regions server’s local node. HBase data is local when it is written, but when a region is moved (for load balancing or recovery), it is not local until major compaction.

HDFS Data Replication

All writes and Reads are to/from the primary node. HDFS replicates the WAL and HFile blocks. HFile block replication happens automatically. HBase relies on HDFS to provide the data safety as it stores its files. When data is written in HDFS, one copy is written locally, and then it is replicated to a secondary node, and a third copy is written to a tertiary node.

HDFS Data Replication (2)

The WAL file and the Hfiles are persisted on disk and replicated, so how does HBase recover the MemStore updates not persisted to HFiles? See the next section for the answer.

HBase Crash Recovery

When a RegionServer fails, Crashed Regions are unavailable until detection and recovery steps have happened. Zookeeper will determine Node failure when it loses region server heart beats. The HMaster will then be notified that the Region Server has failed.

When the HMaster detects that a region server has crashed, the HMaster reassigns the regions from the crashed server to active Region servers. In order to recover the crashed region server’s memstore edits that were not flushed to disk. The HMaster splits the WAL belonging to the crashed region server into separate files and stores these file in the new region servers’ data nodes. Each Region Server then replays the WAL from the respective split WAL, to rebuild the memstore for that region.

Data Recovery

WAL files contain a list of edits, with one edit representing a single put or delete. Edits are written chronologically, so, for persistence, additions are appended to the end of the WAL file that is stored on disk.

What happens if there is a failure when the data is still in memory and not persisted to an HFile? The WAL is replayed. Replaying a WAL is done by reading the WAL, adding and sorting the contained edits to the current MemStore. At the end, the MemStore is flush to write changes to an HFile.

Apache HBase Architecture Benefits

HBase provides the following benefits:

  • Strong consistency model- When a write returns, all readers will see same value
  • Scales automatically- Regions split when data grows too large- Uses HDFS to spread and replicate data
  • Built-in recovery- Using Write Ahead Log (similar to journaling on file system)
  • Integrated with Hadoop- MapReduce on HBase is straightforward

Apache HBase Has Problems Too…

  • Business continuity reliability:- WAL replay slow- Slow complex crash recovery- Major Compaction I/O storms

MapR-DB with MapR-FS does not have these problems

The diagram below compares the application stacks for Apache HBase on top of HDFS on the left, Apache HBase on top of MapR’s read/write file system MapR-FS in the middle, and MapR-DB and MapR-FS in a Unified Storage Layer on the right.

MapR-DB exposes the same HBase API and the Data model for MapR-DB is the same as for Apache HBase. However the MapR-DB implementation integrates table storage into the MapR file system, eliminating all JVM layers and interacting directly with disks for both file and table storage.

MapR-DB offers many benefits over HBase, while maintaining the virtues of the HBase API and the idea of data being sorted according to primary key. MapR-DB provides operational benefits such as no compaction delays and automated region splits that do not impact the performance of the database. The tables in MapR-DB can also be isolated to certain machines in a cluster by utilizing the topology feature of MapR. The final differentiator is that MapR-DB is just plain fast, due primarily to the fact that it is tightly integrated into the MapR file system itself, rather than being layered on top of a distributed file system that is layered on top of a conventional file system.

Key differences between MapR-DB and Apache HBase

  • Tables part of the MapR Read/Write File system
    • Guaranteed data locality
  • Smarter load balancing
    • Uses container Replicas
  • Smarter fail over
    • Uses container replicas
  • Multiple small WALs
    • Faster recovery
  • Memstore Flushes Merged into Read/Write File System
    • No compaction !

You can take this free On Demand training to learn more about MapR-FS and MapR-DB 

In this blog post, you learned more about the HBase architecture and its main benefits over NoSQL data store solutions. If you have any questions about HBase, please ask them in the comments section below.

Categories: #oracle_Emp, big data

Comparing SQL Functions and Performance with Apache Spark and Apache Drill

January 25, 2016 Leave a comment

SQL engines for Hadoop differ in their approach and functionality. My focus for this blog post is to compare and contrast the functions and performance of Apache Spark and Apache Drill and discuss their expected use cases.

Running queries and analysis on structured databases is a standard operation and has been in place for decades. This typically involves developing a structure and schema which gets implemented in a database.

Although the data handled by companies is increasing every year, the pace of this increase is of a completely different magnitude. According to a recent investigation, there are presently 3.5ZB (zetta bytes) of structured data, with 1.4 times more semi-structured and unstructured data, totaling 5ZB. By 2020, however, it is predicted that there will be roughly 6 times more semi-structured and unstructured data than structured data, totaling 30ZB to 5ZB, respectively.1

Figuring out how to analyze this growing amount of data is a key objective for business and data analysts. This is because, along with the multiplicity of data types contained in unstructured data, the data’s actual meanings (namely the words contained in word documents, video frames, etc.) are likely to depend on the actual semantics (context) of the things being analyzed. In addition to the semantics on which the data itself relies, there might also be a need to understand the data path included in that data as a part of the semantics. (For example, the data paths: “C:\Users\kiuchi\My Pets\whale\johnny.jpg” and “http://kiuchi.local/My Pets/cat/michelle.jpg” do not only lead to the image which they are referencing; they can also lead us to believe that I have a whale and a cat among my pets. Essentially, it may be possible to interpret data paths as their nested columns.)

In addition, the increase of information content contained in “semi-structured data” in lighter data exchange formats such as XML and JSON, which are inspired by the development of information sharing over the internet, has made the situation even more complicated. While this semi-structured data, because of its flexible schema structure, does not fit into data stores that strictly define schema beforehand, such as relational databases, compared to data such as emails or office documents, it can nonetheless clearly be said to be structured data. The problem of how to store and analyze such semi-structured data is currently one of the major issues faced by analytics professionals.

Among my readers, I suspect that not many people know the expression “Polyglot Persistence.” This is a concept which was originally developed by Scott Leberknight, and which was brought into the limelight in 2011 when it was taken up by Martin Fowler, an advocate of XP (eXtreme Programming). It advocates for the cross-cutting accessing of data stored in a wide variety of formats on various data sources.2 Should it be achieved, data analysts will be able to grasp the location and format of the data that they are trying to analyze, thus leading to a dramatic reduction in data conversion costs.

In other words, we can say that for big data analytics, which has recently gained a lot of exposure as a buzzword, solving the problem of handling wide varieties of data remains an ongoing effort. Both Apache Drill and Apache Spark are execution engines for Hadoop that were born under these circumstances. By comparing the functions and performance of Apache Drill and Apache Spark, this article will explore their characteristics and expected use cases.

Functional Comparison

As a data source, Apache Drill and Apache Spark support semi-structured data such as CSV and JSON, as well as relational databases through JDBC. Apache Drill also supports NoSQL databases such as MongoDB ,and HBase.

Supported Data Sources
Apache Drill CSV, JSON, Parquet
Hive, HBase,
MongoDB, MapR- DB, Amazon S3
Apache Spark CSV, JSON, Parquet
HiveQL (compatible),
JDBC

One of the defining characteristics is the fact that they are able to analyze a mix of structured data sources and semi-structured data sources. This sets them apart from traditional SQL engines. In addition, they also have means to access local files, remote data sources that support standard database connection protocols, and data sources on the internet which require connection methods other than JDBC. Finally, it should also be mentioned that both have “plugin” features, meaning that additional data sources can be added.3

Clearly, these are the designs that presuppose a wide variety of data sources and a cross-cutting data analysis. They are good representatives of next-generation SQL engines operating in a big data environment with mixed data sources taking “Polyglot Persistence” into account.

Performance Comparison and Discussion

Let me now compare the query performance of the two technologies. Here, we will use the “MovieLens” data set, which consists of a ratings database from movie viewers collected by a research project at the University of Minnesota. Performance will be measured by running three types of SQL queries, including JOIN access.

The program used here is available at GitHub(m-kiuchi/MovieLensSQL)

The results below are those that were obtained on my PC (2 cores, 6GB memory).

Where did the differences in results come from? I believe that they originated from the differences in the ideas behind Apache Spark and Apache Drill. Since Apache Drill has been developed in order to scan and filter data sources, it is likely that it does not have any extra processing. As it is scanned, a piece of data is sent to the next step of pipeline processing, with the focus being on obtaining the final results in the shortest time possible. Since it assumes that query processing completes in a short time, the process is re-run if a failure occurs during execution. On the other hand, the data source scanning in Spark is only positioned as the preliminary step for parallel processing. As such, since reusability is what is aimed for, partitioning and staging takes place in each step like MapReduce. This allows for more flexible control of the data flows in the program. In case of failure during long batch processing periods, processing can be restarted using the intermediate data, giving it increased fault tolerance. This is the likely cause of the differences in performance. Of course, if sufficient machine resources are available, or if we think of the entirety of the workflow processing time, the differences in these observed processing time might end up falling within the margin of error.

The intent here is not to make a determination of whether Apache Drill or Apache Spark is better based on the above results, whether Apache Drill or Apache Spark is better. What is important is the fact that both these engines are able to execute queries on and transparently capture data from a wide variety of data sources, with Spark being able to conduct more varied processing than SQL queries. What is clear and evident is the fact that what these two execution engines can achieve clearly goes beyond the scope of traditional relational databases.

2 There is a similar concept called “Data Virtualization”
Categories: big data
Follow

Get every new post delivered to your Inbox.

Join 727 other followers