hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [20/43] hbase git commit: HBASE-14849 Add option to set block cache to false on SparkSQL executions (Zhan Zhang)
Date Sat, 26 Dec 2015 17:07:53 GMT
HBASE-14849 Add option to set block cache to false on SparkSQL executions (Zhan Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e75e26e3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e75e26e3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e75e26e3

Branch: refs/heads/hbase-12439
Commit: e75e26e3c6cfe7fd378081839d60fc711c1e095f
Parents: 8c921ea
Author: tedyu <yuzhihong@gmail.com>
Authored: Sat Dec 19 15:14:58 2015 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Sat Dec 19 15:14:58 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      | 56 ++++++--------
 .../hadoop/hbase/spark/datasources/Bound.scala  |  2 -
 .../spark/datasources/HBaseSparkConf.scala      | 32 ++++++++
 .../spark/datasources/HBaseTableScanRDD.scala   | 14 ++--
 .../datasources/SerializableConfiguration.scala |  1 -
 .../hadoop/hbase/spark/DefaultSourceSuite.scala | 81 ++++++++------------
 .../hadoop/hbase/spark/HBaseTestSource.scala    | 63 +++++++++++++++
 7 files changed, 155 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e75e26e3/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 664cf57..73cab3c 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -21,7 +21,9 @@ import java.util
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.spark.datasources.{HBaseTableScanRDD, HBaseRegion, SerializableConfiguration}
+import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
+import org.apache.hadoop.hbase.spark.datasources.HBaseTableScanRDD
+import org.apache.hadoop.hbase.spark.datasources.SerializableConfiguration
 import org.apache.hadoop.hbase.types._
 import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange}
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
@@ -49,8 +51,6 @@ class DefaultSource extends RelationProvider with Logging {
 
   val TABLE_KEY:String = "hbase.table"
   val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
-  val BATCHING_NUM_KEY:String = "hbase.batching.num"
-  val CACHING_NUM_KEY:String = "hbase.caching.num"
   val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
   val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
   val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter"
@@ -71,35 +71,16 @@ class DefaultSource extends RelationProvider with Logging {
       new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'")
 
     val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
-    val batchingNumStr = parameters.getOrElse(BATCHING_NUM_KEY, "1000")
-    val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000")
     val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
     val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
     val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true")
 
-    val batchingNum:Int = try {
-      batchingNumStr.toInt
-    } catch {
-      case e:NumberFormatException => throw
-        new IllegalArgumentException("Invalid value for " + BATCHING_NUM_KEY +" '"
-            + batchingNumStr + "'", e)
-    }
-
-    val cachingNum:Int = try {
-      cachingNumStr.toInt
-    } catch {
-      case e:NumberFormatException => throw
-        new IllegalArgumentException("Invalid value for " + CACHING_NUM_KEY +" '"
-            + cachingNumStr + "'", e)
-    }
-
     new HBaseRelation(tableName.get,
       generateSchemaMappingMap(schemaMappingString),
-      batchingNum.toInt,
-      cachingNum.toInt,
       hbaseConfigResources,
       useHBaseReources.equalsIgnoreCase("true"),
-      usePushDownColumnFilter.equalsIgnoreCase("true"))(sqlContext)
+      usePushDownColumnFilter.equalsIgnoreCase("true"),
+      parameters)(sqlContext)
   }
 
   /**
@@ -148,10 +129,6 @@ class DefaultSource extends RelationProvider with Logging {
  * @param tableName               HBase table that we plan to read from
  * @param schemaMappingDefinition SchemaMapping information to map HBase
  *                                Qualifiers to SparkSQL columns
- * @param batchingNum             The batching number to be applied to the
- *                                scan object
- * @param cachingNum              The caching number to be applied to the
- *                                scan object
  * @param configResources         Optional comma separated list of config resources
  *                                to get based on their URI
  * @param useHBaseContext         If true this will look to see if
@@ -162,14 +139,26 @@ class DefaultSource extends RelationProvider with Logging {
 case class HBaseRelation (val tableName:String,
                      val schemaMappingDefinition:
                      java.util.HashMap[String, SchemaQualifierDefinition],
-                     val batchingNum:Int,
-                     val cachingNum:Int,
                      val configResources:String,
                      val useHBaseContext:Boolean,
-                     val usePushDownColumnFilter:Boolean) (
+                     val usePushDownColumnFilter:Boolean,
+                     @transient parameters: Map[String, String] ) (
   @transient val sqlContext:SQLContext)
   extends BaseRelation with PrunedFilteredScan with Logging {
 
+  // The user supplied per table parameter will overwrite global ones in SparkConf
+  val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
+    .getOrElse(
+      sqlContext.sparkContext.getConf.getBoolean(
+        HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable))
+  val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt)
+    .getOrElse(
+      sqlContext.sparkContext.getConf.getInt(
+      HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize))
+  val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt)
+    .getOrElse(sqlContext.sparkContext.getConf.getInt(
+    HBaseSparkConf.BATCH_NUM,  HBaseSparkConf.defaultBatchNum))
+
   //create or get latest HBaseContext
   @transient val hbaseContext:HBaseContext = if (useHBaseContext) {
     LatestHBaseContextCache.latest
@@ -321,8 +310,9 @@ case class HBaseRelation (val tableName:String,
 
     if (resultRDD == null) {
       val scan = new Scan()
-      scan.setBatch(batchingNum)
-      scan.setCaching(cachingNum)
+      scan.setCacheBlocks(blockCacheEnable)
+      scan.setBatch(batchNum)
+      scan.setCaching(cacheSize)
       requiredQualifierDefinitionList.foreach( d =>
         scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e75e26e3/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
index c869f31..0f6098d 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
@@ -17,8 +17,6 @@
 
 package org.apache.hadoop.hbase.spark.datasources
 
-import org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
-import org.apache.spark.Partition
 import org.apache.hadoop.hbase.spark.hbase._
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e75e26e3/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
new file mode 100644
index 0000000..67580b0
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+object HBaseSparkConf{
+  // This is the hbase configuration. User can either set them in SparkConf, which
+  // will take effect globally, or configure it per table, which will overwrite the value
+  // set in SparkConf. If not setted, the default value will take effect.
+  val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable"
+  // default block cache is set to true by default following hbase convention, but note that
+  // this potentially may slow down the system
+  val defaultBlockCacheEnable = true
+  val CACHE_SIZE = "spark.hbase.cacheSize"
+  val defaultCachingSize = 1000
+  val BATCH_NUM = "spark.hbase.batchNum"
+  val defaultBatchNum = 1000
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e75e26e3/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index 958a986..eb9d39a 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -17,17 +17,11 @@
 
 package org.apache.hadoop.hbase.spark.datasources
 
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.filter.Filter
 import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation,
SparkSQLPushDownFilter}
 import org.apache.hadoop.hbase.spark.hbase._
 import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.{TaskContext, Logging, Partition}
+import org.apache.spark.{SparkEnv, TaskContext, Logging, Partition}
 import org.apache.spark.rdd.RDD
 
 import scala.collection.mutable
@@ -37,6 +31,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
      @transient val filter: Option[SparkSQLPushDownFilter] = None,
      val columns: Seq[SchemaQualifierDefinition] = Seq.empty
      )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging  {
+  private def sparkConf = SparkEnv.get.conf
   var ranges = Seq.empty[Range]
   def addRange(r: ScanRange) = {
     val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
@@ -106,8 +101,9 @@ class HBaseTableScanRDD(relation: HBaseRelation,
         scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)
       }
     }
-    scan.setBatch(relation.batchingNum)
-    scan.setCaching(relation.cachingNum)
+    scan.setCacheBlocks(relation.blockCacheEnable)
+    scan.setBatch(relation.batchNum)
+    scan.setCaching(relation.cacheSize)
     filter.foreach(scan.setFilter(_))
     scan
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e75e26e3/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
index 2452a52..42a5c32 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.spark.datasources
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.util.Utils
 
 import scala.util.control.NonFatal
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e75e26e3/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 2cee3a8..30ddfc4 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -18,10 +18,11 @@
 package org.apache.hadoop.hbase.spark
 
 import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
+import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{TableNotFoundException, TableName, HBaseTestingUtility}
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 class DefaultSourceSuite extends FunSuite with
@@ -57,8 +58,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     logInfo(" - creating table " + t2TableName)
     TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
     logInfo(" - created table")
-
-    sc = new SparkContext("local", "test")
+    val sparkConf = new SparkConf
+    sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
+    sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
+    sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+    sc  = new SparkContext("local", "test", sparkConf)
 
     val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
     try {
@@ -139,18 +143,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
       Map("hbase.columns.mapping" ->
         "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
-        "hbase.table" -> "t1",
-        "hbase.batching.num" -> "100",
-        "cachingNum" -> "100"))
+        "hbase.table" -> "t1"))
 
     df.registerTempTable("hbaseTable1")
 
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
       Map("hbase.columns.mapping" ->
         "KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
-        "hbase.table" -> "t2",
-        "hbase.batching.num" -> "100",
-        "cachingNum" -> "100"))
+        "hbase.table" -> "t2"))
 
     df.registerTempTable("hbaseTable2")
   }
@@ -635,49 +635,32 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     }
   }
 
-  test("Test bad hbase.batching.num type") {
-    intercept[IllegalArgumentException] {
-      df = sqlContext.load("org.apache.hadoop.hbase.spark",
-        Map("hbase.columns.mapping" ->
-          "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING
c:i,",
-          "hbase.table" -> "t1", "hbase.batching.num" -> "foo"))
-
-      df.registerTempTable("hbaseIntWrongTypeTmp")
-
-      val result = sqlContext.sql("SELECT KEY_FIELD, " +
-        "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
-
-      assert(result.count() == 5)
-
-      val localResult = result.take(5)
-      localResult.length
-
-      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-      assert(executionRules.dynamicLogicExpression == null)
-
-
+  test("Test HBaseSparkConf matching") {
+    val df = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
+      Map("cacheSize" -> "100",
+        "batchNum" -> "100",
+        "blockCacheingEnable" -> "true", "rowNum" -> "10"))
+    assert(df.count() == 10)
+
+    val df1 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
+      Map("cacheSize" -> "1000",
+        "batchNum" -> "100", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
+    intercept[Exception] {
+      assert(df1.count() == 10)
     }
-  }
-
-  test("Test bad hbase.caching.num type") {
-    intercept[IllegalArgumentException] {
-      df = sqlContext.load("org.apache.hadoop.hbase.spark",
-        Map("hbase.columns.mapping" ->
-          "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING
c:i,",
-          "hbase.table" -> "t1", "hbase.caching.num" -> "foo"))
-
-      df.registerTempTable("hbaseIntWrongTypeTmp")
-
-      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, " +
-        "I_FIELD FROM hbaseIntWrongTypeTmp")
-
-      val localResult = result.take(10)
-      assert(localResult.length == 5)
-
-      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-      assert(executionRules.dynamicLogicExpression == null)
 
+    val df2 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
+      Map("cacheSize" -> "100",
+        "batchNum" -> "1000", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
+    intercept[Exception] {
+      assert(df2.count() == 10)
+    }
 
+    val df3 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
+      Map("cacheSize" -> "100",
+        "batchNum" -> "100", "blockCacheingEnable" -> "false", "rowNum" -> "10"))
+    intercept[Exception] {
+      assert(df3.count() == 10)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e75e26e3/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
new file mode 100644
index 0000000..83465d9
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
+import org.apache.spark.SparkEnv
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+class HBaseTestSource extends RelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    DummyScan(
+      parameters("cacheSize").toInt,
+      parameters("batchNum").toInt,
+      parameters("blockCacheingEnable").toBoolean,
+      parameters("rowNum").toInt)(sqlContext)
+  }
+}
+
+case class DummyScan(
+     cacheSize: Int,
+     batchNum: Int,
+     blockCachingEnable: Boolean,
+     rowNum: Int)(@transient val sqlContext: SQLContext)
+  extends BaseRelation with TableScan {
+  private def sparkConf = SparkEnv.get.conf
+  override def schema: StructType =
+    StructType(StructField("i", IntegerType, nullable = false) :: Nil)
+
+  override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum)
+    .map(Row(_))
+    .map{ x =>
+      if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM,
+        HBaseSparkConf.defaultBatchNum) != batchNum ||
+        sparkConf.getInt(HBaseSparkConf.CACHE_SIZE,
+          HBaseSparkConf.defaultCachingSize) != cacheSize ||
+        sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE,
+          HBaseSparkConf.defaultBlockCacheEnable)
+          != blockCachingEnable) {
+        throw new Exception("HBase Spark configuration cannot be set properly")
+      }
+      x
+  }
+}


Mime
View raw message