Spark Streaming with HBase

What is Spark Streaming?

First of all, what is streaming? A data stream is an unbounded sequence of data arriving continuously. Streaming divides continuously flowing input data into discrete units for processing. Stream processing is low latency processing and analyzing of streaming data. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data. Spark Streaming is for use cases which require a significant amount of data to be quickly processed as soon as it arrives. Example real-time use cases are:

  • Website monitoring , Network monitoring
  • Fraud detection
  • Web clicks
  • Advertising
  • Internet of Things: sensors

Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark’s core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.

How Spark Streaming Works

Streaming data is continuous and needs to be batched to process. Spark Streaming divides the data stream into batches of X seconds called Dstreams, which internally is a sequence of RDDs. Your Spark Application processes the RDDs using Spark APIs, and the processed results of the RDD operations are returned in batches.

Architecture of the example Streaming Application

The Spark Streaming example code does the following:

  • Reads streaming data.
  • Processes the streaming data.
  • Writes the processed data to an HBase Table.

Other Spark example code does the following:

  • Reads HBase Table data written by the streaming code
  • Calculates daily summary statistics
  • Writes summary statistics to the HBase table Column Family stats

Example data set

The Oil Pump Sensor data comes in as comma separated value (csv) files dropped in a directory. Spark Streaming will monitor the directory and process any files created in that directory. (As stated before, Spark Streaming supports different streaming data sources; for simplicity, this example will use files.) Below is an example of the csv file with some sample data:

We use a Scala case class to define the Sensor schema corresponding to the sensor data csv files, and a parseSensor function to parse the comma separated values into the sensor case class.

// schema for sensor data
case class Sensor(resid: String, date: String, time: String, hz: Double, disp: Double, flo: Double, 
          sedPPM: Double, psi: Double, chlPPM: Double)

object Sensor {
   // function to parse line of csv data into Sensor class
   def parseSensor(str: String): Sensor = {
       val p = str.split(",")
        Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble,
            p(7).toDouble, p(8).toDouble)
  }
…
}

HBase Table schema

The HBase Table Schema for the streaming data is as follows:

  • Composite row key of the pump name date and time stamp
  • Column Family data with columns corresponding to the input data fields Column Family alerts with columns corresponding to any filters for alarming values Note that the data and alert column families could be set to expire values after a certain amount of time.

The Schema for the daily statistics summary rollups is as follows:

  • Composite row key of the pump name and date
  • Column Family stats
  • Columns for min, max, avg.

The function below converts a Sensor object into an HBase Put object, which is used to insert a row into HBase.

val cfDataBytes = Bytes.toBytes("data")

object Sensor {
. . .
  //  Convert a row of sensor object data to an HBase put object
  def convertToPut(sensor: Sensor): (ImmutableBytesWritable, Put) = {
      val dateTime = sensor.date + " " + sensor.time
      // create a composite row key: sensorid_date time
      val rowkey = sensor.resid + "_" + dateTime
      val put = new Put(Bytes.toBytes(rowkey))
      // add to column family data, column  data values to put object 
      put.add(cfDataBytes, Bytes.toBytes("hz"), Bytes.toBytes(sensor.hz))
      put.add(cfDataBytes, Bytes.toBytes("disp"), Bytes.toBytes(sensor.disp))
      put.add(cfDataBytes, Bytes.toBytes("flo"), Bytes.toBytes(sensor.flo))
      put.add(cfDataBytes, Bytes.toBytes("sedPPM"), Bytes.toBytes(sensor.sedPPM))
      put.add(cfDataBytes, Bytes.toBytes("psi"), Bytes.toBytes(sensor.psi))
      put.add(cfDataBytes, Bytes.toBytes("chlPPM"), Bytes.toBytes(sensor.chlPPM))
      return (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)
  }
}

Configuration for Writing to an HBase Table

You can use the TableOutputFormat class with Spark to write to an HBase table, similar to how you would write to an HBase table from MapReduce. Below we set up the configuration for writing to HBase using the TableOutputFormat class.

   val tableName = "sensor"
     
   // set up Hadoop HBase configuration using TableOutputFormat
    val conf = HBaseConfiguration.create()
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    val jobConfig: jobConfig = new JobConf(conf, this.getClass)
    jobConfig.setOutputFormat(classOf[TableOutputFormat])
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)

The Spark Streaming Example Code

These are the basic steps for Spark Streaming code:

  1. Initialize a Spark StreamingContext object.
  2. Apply transformations and output operations to DStreams.
  3. Start receiving data and processing it using streamingContext.start().
  4. Wait for the processing to be stopped using streamingContext.awaitTermination().

We will go through each of these steps with the example application code.

Initializing the StreamingContext

First we create a StreamingContext, the main entry point for streaming functionality, with a 2 second batch interval. (In the code boxes, comments are in Green)

val sparkConf = new SparkConf().setAppName("HBaseStream")

//  create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

Next, we use the StreamingContext textFileStream(directory) method to create an input stream that monitors a Hadoop-compatible file system for new files and processes any files created in that directory.

// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

The linesDStream represents the stream of data, each record is a line of text. Internally a DStream is a sequence of RDDs, one RDD per batch interval.

Apply transformations and output operations to DStreams

Next we parse the lines of data into Sensor objects, with the map operation on the linesDStream.

// parse each line of data in linesDStream  into sensor objects

val sensorDStream = linesDStream.map(Sensor.parseSensor) 

The map operation applies the Sensor.parseSensor function on the RDDs in the linesDStream, resulting in RDDs of Sensor objects.

Next we use the DStream foreachRDD method to apply processing to each RDD in this DStream. We filter the sensor objects for low psi to create alerts, then we write the sensor and alert data to HBase by converting them to Put objects, and using the PairRDDFunctions saveAsHadoopDatasetmethod, which outputs the RDD to any Hadoop-supported storage system using a Hadoop Configuration object for that storage system (see Hadoop Configuration for HBase above).

// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
        // filter sensor data for low psi
     val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)

      // convert sensor data to put object and write to HBase  Table CF data
      rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)

     // convert alert to put object write to HBase  Table CF alerts
     rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}

The sensorRDD objects are converted to put objects then written to HBase.

Start receiving data

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

Spark Reading from and Writing to HBase

Now we want to read the HBase sensor table data , calculate daily summary statistics and write these statistics to the stats column family.

The code below reads the HBase table sensor table psi column data, calculates statistics on this data using StatCounter, then writes the statistics to the sensor stats column family.

     // configure HBase for reading 
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
    // scan data column family psi column
    conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") 

// Load an RDD of (row key, row Result) tuples from the table
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    // transform (row key, row Result) tuples into an RDD of Results
    val resultRDD = hBaseRDD.map(tuple => tuple._2)

    // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
    val keyValueRDD = resultRDD.
              map(result => (Bytes.toString(result.getRow()).
              split(" ")(0), Bytes.toDouble(result.value)))

    // group by rowkey , get statistics for column value
    val keyStatsRDD = keyValueRDD.
             groupByKey().
             mapValues(list => StatCounter(list))

    // convert rowkey, stats to put and write to hbase table stats column family
    keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

The diagram below shows that the output from newAPIHadoopRDD is an RDD of row key, result pairs. The PairRDDFunctions saveAsHadoopDataset saves the Put objects to HBase.

Software

Running the Application

You can run the code as a standalone application as described in the tutorial on Getting Started with Spark on MapR Sandbox.

Here are the steps summarized:

  1. Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr.
  2. Build the application using maven.
  3. Copy the jar file and data file to your sandbox home directory /user/user01 using scp.
  4. Run the streaming app:
     /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar
    
  5. Copy the streaming data file to the stream directory:
    cp sensordata.csv /user/user01/stream/
  6. Read data and calculate stats for one column
       /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
        --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
    
  7. Calculate stats for whole row
      /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar
Advertisements

An In-Depth Look at the HBase Architecture

HBase Architectural Components

Physically, HBase is composed of three types of servers in a master slave type of architecture. Region servers serve data for reads and writes. When accessing data, clients communicate with HBase RegionServers directly. Region assignment, DDL (create, delete tables) operations are handled by the HBase Master process. Zookeeper, which is part of HDFS, maintains a live cluster state.

The Hadoop DataNode stores the data that the Region Server is managing. All HBase data is stored in HDFS files. Region Servers are collocated with the HDFS DataNodes, which enable data locality (putting the data close to where it is needed) for the data served by the RegionServers. HBase data is local when it is written, but when a region is moved, it is not local until compaction.

The NameNode maintains metadata information for all the physical data blocks that comprise the files.

Regions

HBase Tables are divided horizontally by row key range into “Regions.” A region contains all rows in the table between the region’s start key and end key. Regions are assigned to the nodes in the cluster, called “Region Servers,” and these serve data for reads and writes. A region server can serve about 1,000 regions.

HBase HMaster

Region assignment, DDL (create, delete tables) operations are handled by the HBase Master.

A master is responsible for:

  • Coordinating the region servers- Assigning regions on startup , re-assigning regions for recovery or load balancing- Monitoring all RegionServer instances in the cluster (listens for notifications from zookeeper)
  • Admin functions- Interface for creating, deleting, updating tables

ZooKeeper: The Coordinator

HBase uses ZooKeeper as a distributed coordination service to maintain server state in the cluster. Zookeeper maintains which servers are alive and available, and provides server failure notification. Zookeeper uses consensus to guarantee common shared state. Note that there should be three or five machines for consensus.

How the Components Work Together

Zookeeper is used to coordinate shared state information for members of distributed systems. Region servers and the active HMaster connect with a session to ZooKeeper. The ZooKeeper maintains ephemeral nodes for active sessions via heartbeats.

Each Region Server creates an ephemeral node. The HMaster monitors these nodes to discover available region servers, and it also monitors these nodes for server failures. HMasters vie to create an ephemeral node. Zookeeper determines the first one and uses it to make sure that only one master is active. The active HMaster sends heartbeats to Zookeeper, and the inactive HMaster listens for notifications of the active HMaster failure.

If a region server or the active HMaster fails to send a heartbeat, the session is expired and the corresponding ephemeral node is deleted. Listeners for updates will be notified of the deleted nodes. The active HMaster listens for region servers, and will recover region servers on failure. The Inactive HMaster listens for active HMaster failure, and if an active HMaster fails, the inactive HMaster becomes active.

HBase First Read or Write

There is a special HBase Catalog table called the META table, which holds the location of the regions in the cluster. ZooKeeper stores the location of the META table.

This is what happens the first time a client reads or writes to HBase:

  1. The client gets the Region server that hosts the META table from ZooKeeper.
  2. The client will query the .META. server to get the region server corresponding to the row key it wants to access. The client caches this information along with the META table location.
  3. It will get the Row from the corresponding Region Server.

For future reads, the client uses the cache to retrieve the META location and previously read row keys. Over time, it does not need to query the META table, unless there is a miss because a region has moved; then it will re-query and update the cache.

HBase Meta Table

  • This META table is an HBase table that keeps a list of all regions in the system.
  • The .META. table is like a b tree.
  • The .META. table structure is as follows:- Key: region start key,region id- Values: RegionServer

Region Server Components

A Region Server runs on an HDFS data node and has the following components:

  • WAL: Write Ahead Log is a file on the distributed file system. The WAL is used to store new data that hasn’t yet been persisted to permanent storage; it is used for recovery in the case of failure.
  • BlockCache: is the read cache. It stores frequently read data in memory. Least Recently Used data is evicted when full.
  • MemStore: is the write cache. It stores new data which has not yet been written to disk. It is sorted before writing to disk. There is one MemStore per column family per region.
  • Hfiles store the rows as sorted KeyValues on disk.

HBase Write Steps (1)

When the client issues a Put request, the first step is to write the data to the write-ahead log, the WAL:

– Edits are appended to the end of the WAL file that is stored on disk. – The WAL is used to recover not-yet-persisted data in case a server crashes.

HBase Write Steps (2)

Once the data is written to the WAL, it is placed in the MemStore. Then, the put request acknowledgement returns to the client.

HBase MemStore

The MemStore stores updates in memory as sorted KeyValues, the same as it would be stored in an HFile. There is one MemStore per column family. The updates are sorted per column family.

HBase Region Flush

When the MemStore accumulates enough data, the entire sorted set is written to a new HFile in HDFS. HBase uses multiple HFiles per column family, which contain the actual cells, or KeyValue instances. These files are created over time as KeyValue edits sorted in the MemStores are flushed as files to disk.

Note that this is one reason why there is a limit to the number of column families in HBase. There is one MemStore per CF; when one is full, they all flush. It also saves the last written sequence number so the system knows what was persisted so far.

The highest sequence number is stored as a meta field in each HFile, to reflect where persisting has ended and where to continue. On region startup, the sequence number is read, and the highest is used as the sequence number for new edits.

HBase HFile

Data is stored in an HFile which contains sorted key/values. When the MemStore accumulates enough data, the entire sorted KeyValue set is written to a new HFile in HDFS. This is a sequential write. It is very fast, as it avoids moving the disk drive head.

HBase HFile Structure

An HFile contains a multi-layered index which allows HBase to seek to the data without having to read the whole file. The multi-level index is like a b+tree:

  • Key value pairs are stored in increasing order
  • Indexes point by row key to the key value data in 64KB “blocks”
  • Each block has its own leaf-index
  • The last key of each block is put in the intermediate index
  • The root index points to the intermediate index

The trailer points to the meta blocks, and is written at the end of persisting the data to the file. The trailer also has information like bloom filters and time range info. Bloom filters help to skip files that do not contain a certain row key. The time range info is useful for skipping the file if it is not in the time range the read is looking for.

HFile Index

The index, which we just discussed, is loaded when the HFile is opened and kept in memory. This allows lookups to be performed with a single disk seek.

HBase Read Merge

We have seen that the KeyValue cells corresponding to one row can be in multiple places, row cells already persisted are in Hfiles, recently updated cells are in the MemStore, and recently read cells are in the Block cache. So when you read a row, how does the system get the corresponding cells to return? A Read merges Key Values from the block cache, MemStore, and HFiles in the following steps:

  1. First, the scanner looks for the Row cells in the Block cache – the read cache. Recently Read Key Values are cached here, and Least Recently Used are evicted when memory is needed.
  2. Next, the scanner looks in the MemStore, the write cache in memory containing the most recent writes.
  3. If the scanner does not find all of the row cells in the MemStore and Block Cache, then HBase will use the Block Cache indexes and bloom filters to load HFiles into memory, which may contain the target row cells.

HBase Read Merge

As discussed earlier, there may be many HFiles per MemStore, which means for a read, multiple files may have to be examined, which can affect the performance. This is called read amplification.

HBase Minor Compaction

HBase will automatically pick some smaller HFiles and rewrite them into fewer bigger Hfiles. This process is called minor compaction. Minor compaction reduces the number of storage files by rewriting smaller files into fewer but larger ones, performing a merge sort.

HBase Major Compaction

Major compaction merges and rewrites all the HFiles in a region to one HFile per column family, and in the process, drops deleted or expired cells. This improves read performance; however, since major compaction rewrites all of the files, lots of disk I/O and network traffic might occur during the process. This is called write amplification.

Major compactions can be scheduled to run automatically. Due to write amplification, major compactions are usually scheduled for weekends or evenings. Note that MapR-DB has made improvements and does not need to do compactions. A major compaction also makes any data files that were remote, due to server failure or load balancing, local to the region server.

Region = Contiguous Keys

Let’s do a quick review of regions:

  • A table can be divided horizontally into one or more regions. A region contains a contiguous, sorted range of rows between a start key and an end key
  • Each region is 1GB in size (default)
  • A region of a table is served to the client by a RegionServer
  • A region server can serve about 1,000 regions (which may belong to the same table or different tables)

Region Split

Initially there is one region per table. When a region grows too large, it splits into two child regions. Both child regions, representing one-half of the original region, are opened in parallel on the same Region server, and then the split is reported to the HMaster. For load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers.

Read Load Balancing

Splitting happens initially on the same region server, but for load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers. This results in the new Region server serving data from a remote HDFS node until a major compaction moves the data files to the Regions server’s local node. HBase data is local when it is written, but when a region is moved (for load balancing or recovery), it is not local until major compaction.

HDFS Data Replication

All writes and Reads are to/from the primary node. HDFS replicates the WAL and HFile blocks. HFile block replication happens automatically. HBase relies on HDFS to provide the data safety as it stores its files. When data is written in HDFS, one copy is written locally, and then it is replicated to a secondary node, and a third copy is written to a tertiary node.

HDFS Data Replication (2)

The WAL file and the Hfiles are persisted on disk and replicated, so how does HBase recover the MemStore updates not persisted to HFiles? See the next section for the answer.

HBase Crash Recovery

When a RegionServer fails, Crashed Regions are unavailable until detection and recovery steps have happened. Zookeeper will determine Node failure when it loses region server heart beats. The HMaster will then be notified that the Region Server has failed.

When the HMaster detects that a region server has crashed, the HMaster reassigns the regions from the crashed server to active Region servers. In order to recover the crashed region server’s memstore edits that were not flushed to disk. The HMaster splits the WAL belonging to the crashed region server into separate files and stores these file in the new region servers’ data nodes. Each Region Server then replays the WAL from the respective split WAL, to rebuild the memstore for that region.

Data Recovery

WAL files contain a list of edits, with one edit representing a single put or delete. Edits are written chronologically, so, for persistence, additions are appended to the end of the WAL file that is stored on disk.

What happens if there is a failure when the data is still in memory and not persisted to an HFile? The WAL is replayed. Replaying a WAL is done by reading the WAL, adding and sorting the contained edits to the current MemStore. At the end, the MemStore is flush to write changes to an HFile.

Apache HBase Architecture Benefits

HBase provides the following benefits:

  • Strong consistency model- When a write returns, all readers will see same value
  • Scales automatically- Regions split when data grows too large- Uses HDFS to spread and replicate data
  • Built-in recovery- Using Write Ahead Log (similar to journaling on file system)
  • Integrated with Hadoop- MapReduce on HBase is straightforward

Apache HBase Has Problems Too…

  • Business continuity reliability:- WAL replay slow- Slow complex crash recovery- Major Compaction I/O storms

MapR-DB with MapR-FS does not have these problems

The diagram below compares the application stacks for Apache HBase on top of HDFS on the left, Apache HBase on top of MapR’s read/write file system MapR-FS in the middle, and MapR-DB and MapR-FS in a Unified Storage Layer on the right.

MapR-DB exposes the same HBase API and the Data model for MapR-DB is the same as for Apache HBase. However the MapR-DB implementation integrates table storage into the MapR file system, eliminating all JVM layers and interacting directly with disks for both file and table storage.

MapR-DB offers many benefits over HBase, while maintaining the virtues of the HBase API and the idea of data being sorted according to primary key. MapR-DB provides operational benefits such as no compaction delays and automated region splits that do not impact the performance of the database. The tables in MapR-DB can also be isolated to certain machines in a cluster by utilizing the topology feature of MapR. The final differentiator is that MapR-DB is just plain fast, due primarily to the fact that it is tightly integrated into the MapR file system itself, rather than being layered on top of a distributed file system that is layered on top of a conventional file system.

Key differences between MapR-DB and Apache HBase

  • Tables part of the MapR Read/Write File system
    • Guaranteed data locality
  • Smarter load balancing
    • Uses container Replicas
  • Smarter fail over
    • Uses container replicas
  • Multiple small WALs
    • Faster recovery
  • Memstore Flushes Merged into Read/Write File System
    • No compaction !

You can take this free On Demand training to learn more about MapR-FS and MapR-DB 

In this blog post, you learned more about the HBase architecture and its main benefits over NoSQL data store solutions. If you have any questions about HBase, please ask them in the comments section below.

Comparing SQL Functions and Performance with Apache Spark and Apache Drill

SQL engines for Hadoop differ in their approach and functionality. My focus for this blog post is to compare and contrast the functions and performance of Apache Spark and Apache Drill and discuss their expected use cases.

Running queries and analysis on structured databases is a standard operation and has been in place for decades. This typically involves developing a structure and schema which gets implemented in a database.

Although the data handled by companies is increasing every year, the pace of this increase is of a completely different magnitude. According to a recent investigation, there are presently 3.5ZB (zetta bytes) of structured data, with 1.4 times more semi-structured and unstructured data, totaling 5ZB. By 2020, however, it is predicted that there will be roughly 6 times more semi-structured and unstructured data than structured data, totaling 30ZB to 5ZB, respectively.1

Figuring out how to analyze this growing amount of data is a key objective for business and data analysts. This is because, along with the multiplicity of data types contained in unstructured data, the data’s actual meanings (namely the words contained in word documents, video frames, etc.) are likely to depend on the actual semantics (context) of the things being analyzed. In addition to the semantics on which the data itself relies, there might also be a need to understand the data path included in that data as a part of the semantics. (For example, the data paths: “C:\Users\kiuchi\My Pets\whale\johnny.jpg” and “http://kiuchi.local/My Pets/cat/michelle.jpg” do not only lead to the image which they are referencing; they can also lead us to believe that I have a whale and a cat among my pets. Essentially, it may be possible to interpret data paths as their nested columns.)

In addition, the increase of information content contained in “semi-structured data” in lighter data exchange formats such as XML and JSON, which are inspired by the development of information sharing over the internet, has made the situation even more complicated. While this semi-structured data, because of its flexible schema structure, does not fit into data stores that strictly define schema beforehand, such as relational databases, compared to data such as emails or office documents, it can nonetheless clearly be said to be structured data. The problem of how to store and analyze such semi-structured data is currently one of the major issues faced by analytics professionals.

Among my readers, I suspect that not many people know the expression “Polyglot Persistence.” This is a concept which was originally developed by Scott Leberknight, and which was brought into the limelight in 2011 when it was taken up by Martin Fowler, an advocate of XP (eXtreme Programming). It advocates for the cross-cutting accessing of data stored in a wide variety of formats on various data sources.2 Should it be achieved, data analysts will be able to grasp the location and format of the data that they are trying to analyze, thus leading to a dramatic reduction in data conversion costs.

In other words, we can say that for big data analytics, which has recently gained a lot of exposure as a buzzword, solving the problem of handling wide varieties of data remains an ongoing effort. Both Apache Drill and Apache Spark are execution engines for Hadoop that were born under these circumstances. By comparing the functions and performance of Apache Drill and Apache Spark, this article will explore their characteristics and expected use cases.

Functional Comparison

As a data source, Apache Drill and Apache Spark support semi-structured data such as CSV and JSON, as well as relational databases through JDBC. Apache Drill also supports NoSQL databases such as MongoDB ,and HBase.

Supported Data Sources
Apache Drill CSV, JSON, Parquet
Hive, HBase,
MongoDB, MapR- DB, Amazon S3
Apache Spark CSV, JSON, Parquet
HiveQL (compatible),
JDBC

One of the defining characteristics is the fact that they are able to analyze a mix of structured data sources and semi-structured data sources. This sets them apart from traditional SQL engines. In addition, they also have means to access local files, remote data sources that support standard database connection protocols, and data sources on the internet which require connection methods other than JDBC. Finally, it should also be mentioned that both have “plugin” features, meaning that additional data sources can be added.3

Clearly, these are the designs that presuppose a wide variety of data sources and a cross-cutting data analysis. They are good representatives of next-generation SQL engines operating in a big data environment with mixed data sources taking “Polyglot Persistence” into account.

Performance Comparison and Discussion

Let me now compare the query performance of the two technologies. Here, we will use the “MovieLens” data set, which consists of a ratings database from movie viewers collected by a research project at the University of Minnesota. Performance will be measured by running three types of SQL queries, including JOIN access.

The program used here is available at GitHub(m-kiuchi/MovieLensSQL)

The results below are those that were obtained on my PC (2 cores, 6GB memory).

Where did the differences in results come from? I believe that they originated from the differences in the ideas behind Apache Spark and Apache Drill. Since Apache Drill has been developed in order to scan and filter data sources, it is likely that it does not have any extra processing. As it is scanned, a piece of data is sent to the next step of pipeline processing, with the focus being on obtaining the final results in the shortest time possible. Since it assumes that query processing completes in a short time, the process is re-run if a failure occurs during execution. On the other hand, the data source scanning in Spark is only positioned as the preliminary step for parallel processing. As such, since reusability is what is aimed for, partitioning and staging takes place in each step like MapReduce. This allows for more flexible control of the data flows in the program. In case of failure during long batch processing periods, processing can be restarted using the intermediate data, giving it increased fault tolerance. This is the likely cause of the differences in performance. Of course, if sufficient machine resources are available, or if we think of the entirety of the workflow processing time, the differences in these observed processing time might end up falling within the margin of error.

The intent here is not to make a determination of whether Apache Drill or Apache Spark is better based on the above results, whether Apache Drill or Apache Spark is better. What is important is the fact that both these engines are able to execute queries on and transparently capture data from a wide variety of data sources, with Spark being able to conduct more varied processing than SQL queries. What is clear and evident is the fact that what these two execution engines can achieve clearly goes beyond the scope of traditional relational databases.

2 There is a similar concept called “Data Virtualization”

How to check which PSU is installed…if any

Oracle PSUs (Patch Set Updates) are referenced by their 5-place version number.  Unfortunately they do not change version numbers in the Oracle binaries, product banners and such though (see MOS 861152.1), so here’s how to identify which PSU your ORACLE_HOME is at…

Database Server:

$ORACLE_HOME/OPatch/opatch lsinventory -bugs_fixed 
| grep -i -E 'DATABASE PSU|DATABASE PATCH SET UPDATE'
$ORACLE_HOME/OPatch/opatch lsinventory -bugs_fixed 
| egrep -i 'DATABASE PSU|DATABASE PATCH SET UPDATE'

(The first command above being for Linux)

…or using the following SQL:

select comments, version, bundle_series
from sys.registry$history
where bundle_series = 'PSU'
order by action_time;

COMMENTS                       VERSION            BUNDLE_SERIES
------------------------------ ------------------ -----------------
Patchset 11.2.0.2.0            11.2.0.3           PSU
PSU 11.2.0.3.5                 11.2.0.3           PSU

The above view is populated when catbundle.sql is executed.  If the query above ends with “ORA-00904: “BUNDLE_SERIES”: invalid identifier” then no bundle patch (PSU or CPU) has been applied.

Grid Infrastructure:

$ORACLE_HOME/OPatch/opatch lsinventory -bugs_fixed 
| grep -i 'GI PSU'

Cluster Ready Services:

$ORACLE_HOME/OPatch/opatch lsinventory -bugs_fixed 
| grep -i 'TRACKING BUG' | grep -i 'PSU'

Enterprise Manager Agent:

$ORACLE_HOME/OPatch/opatch lsinventory -bugs_fixed 
| grep -i 'ENTERPRISE MANAGER AGENT' | grep -i 'PSU'

Enterprise Manager OMS:

$ORACLE_HOME/OPatch/opatch lsinventory -bugs_fixed 
| grep -i 'ENTERPRISE MANAGER OMS' | grep -i 'PSU'

WebLogic Server:

. $WLS_HOME/server/bin/setWLSEnv.sh
java weblogic.version|grep PSU