hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mi...@apache.org
Subject hbase git commit: HBASE-14158 Add documentation for Initial Release for HBase-Spark Module integration
Date Mon, 12 Oct 2015 22:19:11 GMT
Repository: hbase
Updated Branches:
  refs/heads/master bfcec29dd -> e7f659cac


HBASE-14158 Add documentation for Initial Release for HBase-Spark Module integration

Signed-off-by: Misty Stanley-Jones <mstanleyjones@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e7f659ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e7f659ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e7f659ca

Branch: refs/heads/master
Commit: e7f659cac4498bdb94fd0e2b9e4339a06eaba9c1
Parents: bfcec29
Author: ted malaska <ted.malaska@cloudera.com>
Authored: Mon Oct 12 11:23:16 2015 -0400
Committer: Misty Stanley-Jones <mstanleyjones@cloudera.com>
Committed: Tue Oct 13 08:19:02 2015 +1000

----------------------------------------------------------------------
 src/main/asciidoc/_chapters/spark.adoc | 451 ++++++++++++++++++++++++++++
 1 file changed, 451 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e7f659ca/src/main/asciidoc/_chapters/spark.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/spark.adoc b/src/main/asciidoc/_chapters/spark.adoc
new file mode 100644
index 0000000..9b5179b
--- /dev/null
+++ b/src/main/asciidoc/_chapters/spark.adoc
@@ -0,0 +1,451 @@
+////
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ . . http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+////
+
+[[spark]]
+= HBase and Spark
+:doctype: book
+:numbered:
+:toc: left
+:icons: font
+:experimental:
+
+link:http://spark.apache.org/[Apache Spark] is a software framework that is used
+to process data in memory in a distributed manner, and is replacing MapReduce in
+many use cases.
+
+Spark itself is out of scope of this document, please refer to the Spark site for
+more information on the Spark project and subprojects. This document will focus
+on 4 main interaction points between Spark and HBase. Those interaction points are:
+
+Basic Spark::
+  The ability to have a HBase Connection at any point in your Spark DAG.
+Spark Streaming::
+  The ability to have a HBase Connection at any point in your Spark Streaming
+  application.
+Spark Bulk Load::
+  The ability to write directly to HBase HFiles for bulk insertion into HBase
+SparkSQL/DataFrames::
+  The ability to write SparkSQL that draws on tables that are represented in HBase.
+
+The following sections will walk through examples of all these interaction points.
+
+== Basic Spark
+
+This section discusses Spark HBase integration at the lowest and simplest levels.
+All the other interaction points are built upon the concepts that will be described
+here.
+
+At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext
+takes in HBase configurations and pushes them to the Spark executors. This allows
+us to have an HBase Connection per Spark Executor in a static location.
+
+For reference, Spark Executors can be on the same nodes as the Region Servers or
+on different nodes there is no dependence of co-location. Think of every Spark
+Executor as a multi-threaded client application. This allows any Spark Tasks
+running on the executors to access the shared Connection object.
+
+.HBaseContext Usage Example
+====
+
+This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD
+in Scala:
+
+[source, scala]
+----
+val sc = new SparkContext("local", "test")
+val config = new HBaseConfiguration()
+
+...
+
+val hbaseContext = new HBaseContext(sc, config)
+
+rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
+ val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
+ it.foreach((putRecord) => {
+. val put = new Put(putRecord._1)
+. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
+. bufferedMutator.mutate(put)
+ })
+ bufferedMutator.flush()
+ bufferedMutator.close()
+})
+----
+
+Here is the same example implemented in Java:
+
+[source, java]
+----
+JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+try {
+  List<byte[]> list = new ArrayList<>();
+  list.add(Bytes.toBytes("1"));
+  ...
+  list.add(Bytes.toBytes("5"));
+
+  JavaRDD<byte[]> rdd = jsc.parallelize(list);
+  Configuration conf = HBaseConfiguration.create();
+
+  JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+  hbaseContext.foreachPartition(rdd,
+      new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
+   public void call(Tuple2<Iterator<byte[]>, Connection> t)
+        throws Exception {
+    Table table = t._2().getTable(TableName.valueOf(tableName));
+    BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
+    while (t._1().hasNext()) {
+      byte[] b = t._1().next();
+      Result r = table.get(new Get(b));
+      if (r.getExists()) {
+       mutator.mutate(new Put(b));
+      }
+    }
+
+    mutator.flush();
+    mutator.close();
+    table.close();
+   }
+  });
+} finally {
+  jsc.stop();
+}
+----
+====
+
+All functionality between Spark and HBase will be supported both in Scala and in
+Java, with the exception of SparkSQL which will support any language that is
+supported by Spark. For the remaining of this documentation we will focus on
+Scala examples for now.
+
+The examples above illustrate how to do a foreachPartition with a connection. A
+number of other Spark base functions  are supported out of the box:
+
+// tag::spark_base_functions[]
+`bulkPut`:: For massively parallel sending of puts to HBase
+`bulkDelete`:: For massively parallel sending of deletes to HBase
+`bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD
+`mapPartition`:: To do a Spark Map function with a Connection object to allow full
+access to HBase
+`hBaseRDD`:: To simplify a distributed scan to create a RDD
+// end::spark_base_functions[]
+
+For examples of all these functionalities, see the HBase-Spark Module.
+
+== Spark Streaming
+http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream
+processing framework built on top of Spark. HBase and Spark Streaming make great
+companions in that HBase can help serve the following benefits alongside Spark
+Streaming.
+
+* A place to grab reference data or profile data on the fly
+* A place to store counts or aggregates in a way that supports Spark Streaming
+promise of _only once processing_.
+
+The HBase-Spark module’s integration points with Spark Streaming are similar to
+its normal Spark integration points, in that the following commands are possible
+straight off a Spark Streaming DStream.
+
+include::spark.adoc[tags=spark_base_functions]
+
+.`bulkPut` Example with DStreams
+====
+
+Below is an example of bulkPut with DStreams. It is very close in feel to the RDD
+bulk put.
+
+[source, scala]
+----
+val sc = new SparkContext("local", "test")
+val config = new HBaseConfiguration()
+
+val hbaseContext = new HBaseContext(sc, config)
+val ssc = new StreamingContext(sc, Milliseconds(200))
+
+val rdd1 = ...
+val rdd2 = ...
+
+val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
+    Array[Byte], Array[Byte])])]]()
+
+queue += rdd1
+queue += rdd2
+
+val dStream = ssc.queueStream(queue)
+
+dStream.hbaseBulkPut(
+  hbaseContext,
+  TableName.valueOf(tableName),
+  (putRecord) => {
+   val put = new Put(putRecord._1)
+   putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
+   put
+  })
+----
+
+There are three inputs to the `hbaseBulkPut` function.
+. The hbaseContext that carries the configuration boardcast information link us
+to the HBase Connections in the executors
+. The table name of the table we are putting data into
+. A function that will convert a record in the DStream into a HBase Put object.
+====
+
+== Bulk Load
+
+Spark bulk load follows very closely to the MapReduce implementation of bulk
+load. In short, a partitioner partitions based on region splits and
+the row keys are sent to the reducers in order, so that HFiles can be written
+out. In Spark terms, the bulk load will be focused around a
+`repartitionAndSortWithinPartitions` followed by a `foreachPartition`.
+
+The only major difference with the Spark implementation compared to the
+MapReduce implementation is that the column qualifier is included in the shuffle
+ordering process. This was done because the MapReduce bulk load implementation
+would have memory issues with loading rows with a large numbers of columns, as a
+result of the sorting of those columns being done in the memory of the reducer JVM.
+Instead, that ordering is done in the Spark Shuffle, so there should no longer
+be a limit to the number of columns in a row for bulk loading.
+
+.Bulk Loading Example
+====
+
+The following example shows bulk loading with Spark.
+
+[source, scala]
+----
+val sc = new SparkContext("local", "test")
+val config = new HBaseConfiguration()
+
+val hbaseContext = new HBaseContext(sc, config)
+
+val stagingFolder = ...
+
+rdd.hbaseBulkLoad(TableName.valueOf(tableName),
+  t => {
+   val rowKey = t._1
+   val family:Array[Byte] = t._2(0)._1
+   val qualifier = t._2(0)._2
+   val value = t._2(0)._3
+
+   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+   Seq((keyFamilyQualifier, value)).iterator
+  },
+  stagingFolder.getPath)
+
+val load = new LoadIncrementalHFiles(config)
+load.doBulkLoad(new Path(stagingFolder.getPath),
+  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
+----
+====
+
+The `hbaseBulkLoad` function takes three required parameters:
+
+. The table name of the table we intend to bulk load too
+
+. A function that will convert a record in the RDD to a tuple key value par. With
+the tuple key being a KeyFamilyQualifer object and the value being the cell value.
+The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier.
+The shuffle will partition on the RowKey but will sort by all three values.
+
+. The temporary path for the HFile to be written out too
+
+Following the Spark bulk load command,  use the HBase's LoadIncrementalHFiles object
+to load the newly created HFiles into HBase.
+
+.Additional Parameters for Bulk Loading with Spark
+
+You can set the following attributes with additional parameter options on hbaseBulkLoad.
+
+* Max file size of the HFiles
+* A flag to exclude HFiles from compactions
+* Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
+
+.Using Additional Parameters
+====
+
+[source, scala]
+----
+val sc = new SparkContext("local", "test")
+val config = new HBaseConfiguration()
+
+val hbaseContext = new HBaseContext(sc, config)
+
+val stagingFolder = ...
+
+val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
+val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
+
+familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
+
+rdd.hbaseBulkLoad(TableName.valueOf(tableName),
+  t => {
+   val rowKey = t._1
+   val family:Array[Byte] = t._2(0)._1
+   val qualifier = t._2(0)._2
+   val value = t._2(0)._3
+
+   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+   Seq((keyFamilyQualifier, value)).iterator
+  },
+  stagingFolder.getPath,
+  familyHBaseWriterOptions,
+  compactionExclude = false,
+  HConstants.DEFAULT_MAX_FILE_SIZE)
+
+val load = new LoadIncrementalHFiles(config)
+load.doBulkLoad(new Path(stagingFolder.getPath),
+  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
+----
+====
+
+== SparkSQL/DataFrames
+
+http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports
+SQL that will compute down to a Spark DAG. In addition,SparkSQL is a heavy user
+of DataFrames. DataFrames are like RDDs with schema information.
+
+The HBase-Spark module includes support for Spark SQL and DataFrames, which allows
+you to write SparkSQL directly on HBase tables. In addition the HBase-Spark
+will push down query filtering logic to HBase.
+
+=== Predicate Push Down
+
+There are two examples of predicate push down in the HBase-Spark implementation.
+The first example shows the push down of filtering logic on the RowKey. HBase-Spark
+will reduce the filters on RowKeys down to a set of Get and/or Scan commands.
+
+NOTE: The Scans are distributed scans, rather than a single client scan operation.
+
+If the query looks something like the following, the logic will push down and get
+the rows through 3 Gets and 0 Scans. We can do gets because all the operations
+are `equal` operations.
+
+[source,sql]
+----
+SELECT
+  KEY_FIELD,
+  B_FIELD,
+  A_FIELD
+FROM hbaseTmp
+WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')
+----
+
+Now lets look at an example where we will end up doing two scans on HBase.
+
+[source, sql]
+----
+SELECT
+  KEY_FIELD,
+  B_FIELD,
+  A_FIELD
+FROM hbaseTmp
+WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3'
+----
+
+In this example we will get 0 Gets and 2 Scans. One scan will load everything
+from the first row in the table until “get2” and the second scan will get
+everything from “get3” until the last row in the table.
+
+The next query is a good example of having a good deal of range checks. However
+the ranges overlap. To the code will be smart enough to get the following data
+in a single scan that encompasses all the data asked by the query.
+
+[source, sql]
+----
+SELECT
+  KEY_FIELD,
+  B_FIELD,
+  A_FIELD
+FROM hbaseTmp
+WHERE
+  (KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or
+  (KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')
+----
+
+The second example of push down functionality offered by the HBase-Spark module
+is the ability to push down filter logic for column and cell fields. Just like
+the RowKey logic, all query logic will be consolidated into the minimum number
+of range checks and equal checks by sending a Filter object along with the Scan
+with information about consolidated push down predicates
+
+.SparkSQL Code Example
+====
+This example shows how we can interact with HBase with SQL.
+
+[source, scala]
+----
+val sc = new SparkContext("local", "test")
+val config = new HBaseConfiguration()
+
+new HBaseContext(sc, TEST_UTIL.getConfiguration)
+val sqlContext = new SQLContext(sc)
+
+df = sqlContext.load("org.apache.hadoop.hbase.spark",
+  Map("hbase.columns.mapping" ->
+   "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b",
+   "hbase.table" -> "t1"))
+
+df.registerTempTable("hbaseTmp")
+
+val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " +
+  "WHERE " +
+  "(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
+  "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
+----
+
+There are three major parts of this example that deserve explaining.
+
+The sqlContext.load function::
+  In the sqlContext.load function we see two
+  parameters. The first of these parameters is pointing Spark to the HBase
+  DefaultSource class that will act as the interface between SparkSQL and HBase.
+
+A map of key value pairs::
+  In this example we have two keys in our map, `hbase.columns.mapping` and
+  `hbase.table`. The `hbase.table` directs SparkSQL to use the given HBase table.
+  The `hbase.columns.mapping` key give us the logic to translate HBase columns to
+  SparkSQL columns.
++
+The `hbase.columns.mapping` is a string that follows the following format
++
+[source, scala]
+----
+(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier)
+----
++
+In the example below we see the definition of three fields. Because KEY_FIELD has
+no ColumnFamily, it is the RowKey.
++
+----
+KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b
+----
+
+The registerTempTable function::
+  This is a SparkSQL function that allows us now to be free of Scala when accessing
+  our HBase table directly with SQL with the table name of "hbaseTmp".
+
+The last major point to note in the example is the `sqlContext.sql` function, which
+allows the user to ask their questions in SQL which will be pushed down to the
+DefaultSource code in the HBase-Spark module. The result of this command will be
+a DataFrame with the Schema of KEY_FIELD and B_FIELD.
+====
\ No newline at end of file


Mime
View raw message