Home > #oracle_Emp, big data > Spark Streaming with HBase

Spark Streaming with HBase

What is Spark Streaming?

First of all, what is streaming? A data stream is an unbounded sequence of data arriving continuously. Streaming divides continuously flowing input data into discrete units for processing. Stream processing is low latency processing and analyzing of streaming data. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data. Spark Streaming is for use cases which require a significant amount of data to be quickly processed as soon as it arrives. Example real-time use cases are:

  • Website monitoring , Network monitoring
  • Fraud detection
  • Web clicks
  • Advertising
  • Internet of Things: sensors

Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark’s core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.

How Spark Streaming Works

Streaming data is continuous and needs to be batched to process. Spark Streaming divides the data stream into batches of X seconds called Dstreams, which internally is a sequence of RDDs. Your Spark Application processes the RDDs using Spark APIs, and the processed results of the RDD operations are returned in batches.

Architecture of the example Streaming Application

The Spark Streaming example code does the following:

  • Reads streaming data.
  • Processes the streaming data.
  • Writes the processed data to an HBase Table.

Other Spark example code does the following:

  • Reads HBase Table data written by the streaming code
  • Calculates daily summary statistics
  • Writes summary statistics to the HBase table Column Family stats

Example data set

The Oil Pump Sensor data comes in as comma separated value (csv) files dropped in a directory. Spark Streaming will monitor the directory and process any files created in that directory. (As stated before, Spark Streaming supports different streaming data sources; for simplicity, this example will use files.) Below is an example of the csv file with some sample data:

We use a Scala case class to define the Sensor schema corresponding to the sensor data csv files, and a parseSensor function to parse the comma separated values into the sensor case class.

// schema for sensor data
case class Sensor(resid: String, date: String, time: String, hz: Double, disp: Double, flo: Double, 
          sedPPM: Double, psi: Double, chlPPM: Double)

object Sensor {
   // function to parse line of csv data into Sensor class
   def parseSensor(str: String): Sensor = {
       val p = str.split(",")
        Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble,
            p(7).toDouble, p(8).toDouble)

HBase Table schema

The HBase Table Schema for the streaming data is as follows:

  • Composite row key of the pump name date and time stamp
  • Column Family data with columns corresponding to the input data fields Column Family alerts with columns corresponding to any filters for alarming values Note that the data and alert column families could be set to expire values after a certain amount of time.

The Schema for the daily statistics summary rollups is as follows:

  • Composite row key of the pump name and date
  • Column Family stats
  • Columns for min, max, avg.

The function below converts a Sensor object into an HBase Put object, which is used to insert a row into HBase.

val cfDataBytes = Bytes.toBytes("data")

object Sensor {
. . .
  //  Convert a row of sensor object data to an HBase put object
  def convertToPut(sensor: Sensor): (ImmutableBytesWritable, Put) = {
      val dateTime = sensor.date + " " + sensor.time
      // create a composite row key: sensorid_date time
      val rowkey = sensor.resid + "_" + dateTime
      val put = new Put(Bytes.toBytes(rowkey))
      // add to column family data, column  data values to put object 
      put.add(cfDataBytes, Bytes.toBytes("hz"), Bytes.toBytes(sensor.hz))
      put.add(cfDataBytes, Bytes.toBytes("disp"), Bytes.toBytes(sensor.disp))
      put.add(cfDataBytes, Bytes.toBytes("flo"), Bytes.toBytes(sensor.flo))
      put.add(cfDataBytes, Bytes.toBytes("sedPPM"), Bytes.toBytes(sensor.sedPPM))
      put.add(cfDataBytes, Bytes.toBytes("psi"), Bytes.toBytes(sensor.psi))
      put.add(cfDataBytes, Bytes.toBytes("chlPPM"), Bytes.toBytes(sensor.chlPPM))
      return (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)

Configuration for Writing to an HBase Table

You can use the TableOutputFormat class with Spark to write to an HBase table, similar to how you would write to an HBase table from MapReduce. Below we set up the configuration for writing to HBase using the TableOutputFormat class.

   val tableName = "sensor"
   // set up Hadoop HBase configuration using TableOutputFormat
    val conf = HBaseConfiguration.create()
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    val jobConfig: jobConfig = new JobConf(conf, this.getClass)
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)

The Spark Streaming Example Code

These are the basic steps for Spark Streaming code:

  1. Initialize a Spark StreamingContext object.
  2. Apply transformations and output operations to DStreams.
  3. Start receiving data and processing it using streamingContext.start().
  4. Wait for the processing to be stopped using streamingContext.awaitTermination().

We will go through each of these steps with the example application code.

Initializing the StreamingContext

First we create a StreamingContext, the main entry point for streaming functionality, with a 2 second batch interval. (In the code boxes, comments are in Green)

val sparkConf = new SparkConf().setAppName("HBaseStream")

//  create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

Next, we use the StreamingContext textFileStream(directory) method to create an input stream that monitors a Hadoop-compatible file system for new files and processes any files created in that directory.

// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

The linesDStream represents the stream of data, each record is a line of text. Internally a DStream is a sequence of RDDs, one RDD per batch interval.

Apply transformations and output operations to DStreams

Next we parse the lines of data into Sensor objects, with the map operation on the linesDStream.

// parse each line of data in linesDStream  into sensor objects

val sensorDStream = linesDStream.map(Sensor.parseSensor) 

The map operation applies the Sensor.parseSensor function on the RDDs in the linesDStream, resulting in RDDs of Sensor objects.

Next we use the DStream foreachRDD method to apply processing to each RDD in this DStream. We filter the sensor objects for low psi to create alerts, then we write the sensor and alert data to HBase by converting them to Put objects, and using the PairRDDFunctions saveAsHadoopDatasetmethod, which outputs the RDD to any Hadoop-supported storage system using a Hadoop Configuration object for that storage system (see Hadoop Configuration for HBase above).

// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
        // filter sensor data for low psi
     val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)

      // convert sensor data to put object and write to HBase  Table CF data

     // convert alert to put object write to HBase  Table CF alerts

The sensorRDD objects are converted to put objects then written to HBase.

Start receiving data

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

    // Start the computation
    // Wait for the computation to terminate

Spark Reading from and Writing to HBase

Now we want to read the HBase sensor table data , calculate daily summary statistics and write these statistics to the stats column family.

The code below reads the HBase table sensor table psi column data, calculates statistics on this data using StatCounter, then writes the statistics to the sensor stats column family.

     // configure HBase for reading 
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
    // scan data column family psi column
    conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") 

// Load an RDD of (row key, row Result) tuples from the table
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

    // transform (row key, row Result) tuples into an RDD of Results
    val resultRDD = hBaseRDD.map(tuple => tuple._2)

    // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
    val keyValueRDD = resultRDD.
              map(result => (Bytes.toString(result.getRow()).
              split(" ")(0), Bytes.toDouble(result.value)))

    // group by rowkey , get statistics for column value
    val keyStatsRDD = keyValueRDD.
             mapValues(list => StatCounter(list))

    // convert rowkey, stats to put and write to hbase table stats column family
    keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

The diagram below shows that the output from newAPIHadoopRDD is an RDD of row key, result pairs. The PairRDDFunctions saveAsHadoopDataset saves the Put objects to HBase.


Running the Application

You can run the code as a standalone application as described in the tutorial on Getting Started with Spark on MapR Sandbox.

Here are the steps summarized:

  1. Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr.
  2. Build the application using maven.
  3. Copy the jar file and data file to your sandbox home directory /user/user01 using scp.
  4. Run the streaming app:
     /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar
  5. Copy the streaming data file to the stream directory:
    cp sensordata.csv /user/user01/stream/
  6. Read data and calculate stats for one column
       /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
        --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
  7. Calculate stats for whole row
      /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar
Categories: #oracle_Emp, big data
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: