kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [1/2] kudu git commit: spark: add support for fault tolerant scanner
Date Mon, 15 May 2017 22:16:58 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 4af33ba13 -> b28a59266


spark: add support for fault tolerant scanner

This adds support to use fault tolerant scanner for spark job.
By default non fault tolerant scanner is used. To turn on fault
tolerant scanner, use job config: 'kudu.faultTolerantScan'.

Change-Id: I3f3192025ca5d74197600480fd3d040d70b4bbc2
Reviewed-on: http://gerrit.cloudera.org:8080/6782
Reviewed-by: Dan Burkert <danburkert@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e2705897
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e2705897
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e2705897

Branch: refs/heads/master
Commit: e2705897bc56bb4daab87e9169be56a3292211c2
Parents: 4af33ba
Author: hahao <hao.hao@cloudera.com>
Authored: Tue May 2 22:01:18 2017 -0700
Committer: Dan Burkert <danburkert@apache.org>
Committed: Mon May 15 21:03:44 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 20 ++++++++++++++++----
 .../apache/kudu/spark/kudu/KuduContext.scala    |  5 +++--
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    |  2 ++
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 18 ++++++++++++++++--
 4 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e2705897/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index f5da69b..68f1a18 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -20,6 +20,8 @@ package org.apache.kudu.spark.kudu
 import java.sql.Timestamp
 
 import scala.collection.JavaConverters._
+import scala.util.Try
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -45,6 +47,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
   val TABLE_KEY = "kudu.table"
   val KUDU_MASTER = "kudu.master"
   val OPERATION = "kudu.operation"
+  val FAULT_TOLERANT_SCANNER = "kudu.faultTolerantScan"
 
   /**
     * Construct a BaseRelation using the provided context and parameters.
@@ -61,8 +64,11 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
         s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
     val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
     val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
+    val faultTolerantScanner = Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
+      .getOrElse(false)
 
-    new KuduRelation(tableName, kuduMaster, operationType, None)(sqlContext)
+    new KuduRelation(tableName, kuduMaster, faultTolerantScanner, operationType,
+      None)(sqlContext)
   }
 
   /**
@@ -70,7 +76,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
     *
     * @param sqlContext
     * @param mode Only Append mode is supported. It will upsert or insert data
-    *             to an existing table, depending on the upsert parameter.
+    *             to an existing table, depending on the upsert parameter
     * @param parameters Necessary parameters for kudu.table and kudu.master
     * @param data Dataframe to save into kudu
     * @return returns populated base relation
@@ -93,8 +99,11 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
         s"using key '$TABLE_KEY'"))
     val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
     val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
+    val faultTolerantScanner = Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
+      .getOrElse(false)
 
-    new KuduRelation(tableName, kuduMaster, operationType, Some(schema))(sqlContext)
+    new KuduRelation(tableName, kuduMaster, faultTolerantScanner, operationType,
+      Some(schema))(sqlContext)
   }
 
   private def getOperationType(opParam: String): OperationType = {
@@ -114,6 +123,8 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
   *
   * @param tableName Kudu table that we plan to read from
   * @param masterAddrs Kudu master addresses
+  * @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
+  *                             otherwise, use non fault tolerant one
   * @param operationType The default operation type to perform when writing to the relation
   * @param userSchema A schema used to select columns for the relation
   * @param sqlContext SparkSQL context
@@ -121,6 +132,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
 @InterfaceStability.Unstable
 class KuduRelation(private val tableName: String,
                    private val masterAddrs: String,
+                   private val faultTolerantScanner: Boolean,
                    private val operationType: OperationType,
                    private val userSchema: Option[StructType])(
                    val sqlContext: SQLContext)
@@ -168,7 +180,7 @@ class KuduRelation(private val tableName: String,
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
= {
     val predicates = filters.flatMap(filterToPredicate)
     new KuduRDD(context, 1024 * 1024 * 20, requiredColumns, predicates,
-                table, sqlContext.sparkContext)
+                table, faultTolerantScanner, sqlContext.sparkContext)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2705897/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 8ca1e03..6998053 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -82,14 +82,15 @@ class KuduContext(val kuduMaster: String,
     * @param tableName          table to read from
     * @param columnProjection   list of columns to read. Not specifying this at all
     *                           (i.e. setting to null) or setting to the special
-    *                           string '*' means to project all columns.
+    *                           string '*' means to project all columns
     * @return a new RDD that maps over the given table for the selected columns
     */
   def kuduRDD(sc: SparkContext,
               tableName: String,
               columnProjection: Seq[String] = Nil): RDD[Row] = {
+    // TODO: provide an elegant way to pass various options (faultTolerantScan, etc) to KuduRDD
     new KuduRDD(this, 1024*1024*20, columnProjection.toArray, Array(),
-                syncClient.openTable(tableName), sc)
+                syncClient.openTable(tableName), false, sc)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2705897/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 8a5d859..d52077d 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -34,6 +34,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
                              @transient val projectedCols: Array[String],
                              @transient val predicates: Array[client.KuduPredicate],
                              @transient val table: KuduTable,
+                             @transient val isFaultTolerant: Boolean,
                              @transient val sc: SparkContext) extends RDD[Row](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
@@ -41,6 +42,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
                              .newScanTokenBuilder(table)
                              .batchSizeBytes(batchSize)
                              .setProjectedColumnNames(projectedCols.toSeq.asJava)
+                             .setFaultTolerant(isFaultTolerant)
                              .cacheBlocks(true)
 
     for (predicate <- predicates) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2705897/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 9790e81..5b0618c 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -66,7 +66,6 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter
wi
   var kuduOptions : Map[String, String] = _
 
   before {
-    val rowCount = 10
     rows = insertRows(rowCount)
 
     sqlContext = new SQLContext(sc)
@@ -200,7 +199,7 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter
wi
 
   }
 
-  test("table scan") {
+  test("table non fault tolerant scan") {
     val results = sqlContext.sql(s"SELECT * FROM $tableName").collectAsList()
     assert(results.size() == rowCount)
 
@@ -208,6 +207,21 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter
wi
     assert(results.get(1).isNullAt(2))
   }
 
+  test("table fault tolerant scan") {
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> miniCluster.getMasterAddresses,
+      "kudu.faultTolerantScan" -> "true")
+
+    val table = "faultTolerantScanTest"
+    sqlContext.read.options(kuduOptions).kudu.registerTempTable(table)
+    val results = sqlContext.sql(s"SELECT * FROM $table").collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(!results.get(0).isNullAt(2))
+    assert(results.get(1).isNullAt(2))
+  }
+
   test("table scan with projection") {
     assertEquals(10, sqlContext.sql(s"""SELECT key FROM $tableName""").count())
   }


Mime
View raw message