kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [3/3] kudu git commit: KUDU-2563: [spark] Use the scanner keepAlive API
Date Fri, 05 Oct 2018 02:30:13 GMT
KUDU-2563: [spark] Use the scanner keepAlive API

Adds scheduled keepAlive calls to the scanner in the
KuduRDD RowIterator. The period in which the calls
are made is configurable via keepAlivePeriodMs and
has a default of 15 seconds (which is 1/4 the default
scanner ttl).

This implementation is similar to the Impala integration.
It checks if a call to the keepAlive API is needed as
it processes each row. Compared to a background
thread, this has the downside of being less consistently
scheduled and susceptible to scenarios in which a single
row takes longer to process than the ttl. However,
because the scanner is not thread safe, this is the most
straightforward solution and has been proven to work.

Change-Id: Ia7f26d6ab8deb24982055d247938a11e188c35db
Reviewed-on: http://gerrit.cloudera.org:8080/11571
Reviewed-by: Grant Henke <granthenke@apache.org>
Tested-by: Grant Henke <granthenke@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf1b1f42
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf1b1f42
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf1b1f42

Branch: refs/heads/master
Commit: cf1b1f42cbcc3ee67477ddc44cd0ff5070f1caac
Parents: fb79f8f
Author: Grant Henke <granthenke@apache.org>
Authored: Sun Sep 30 22:31:52 2018 -0500
Committer: Grant Henke <granthenke@apache.org>
Committed: Fri Oct 5 02:24:04 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/AsyncKuduScanner.java    |  7 ++
 .../org/apache/kudu/client/KuduScanner.java     |  9 +-
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 18 ++--
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    | 25 +++++-
 .../kudu/spark/kudu/KuduReadOptions.scala       |  4 +
 .../apache/kudu/spark/kudu/KuduRDDTest.scala    | 87 ++++++++++++++++++++
 6 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 804978e..71b1146 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -605,6 +605,13 @@ public final class AsyncKuduScanner {
   }
 
   /**
+   * @return true if the scanner has been closed.
+   */
+  public boolean isClosed() {
+    return closed;
+  }
+
+  /**
    * Closes this scanner (don't forget to call this when you're done with it!).
    * <p>
    * Closing a scanner already closed has no effect.  The deferred returned

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 209fada..f945d8f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -79,6 +79,13 @@ public class KuduScanner {
   }
 
   /**
+   * @return true if the scanner has been closed.
+   */
+  public boolean isClosed() {
+    return asyncScanner.isClosed();
+  }
+
+  /**
    * Closes this scanner (don't forget to call this when you're done with it!).
    * <p>
    * Closing a scanner already closed has no effect.
@@ -135,7 +142,7 @@ public class KuduScanner {
    * Returns the RemoteTablet currently being scanned, if any.
    */
   @InterfaceAudience.LimitedPrivate("Test")
-  RemoteTablet currentTablet() {
+  public RemoteTablet currentTablet() {
     return asyncScanner.currentTablet();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 29635a3..890ecda 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -62,6 +62,7 @@ class DefaultSource
   val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
   val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
   val BATCH_SIZE = "kudu.batchSize"
+  val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
 
   /**
    * Construct a BaseRelation using the provided context and parameters.
@@ -77,13 +78,13 @@ class DefaultSource
   }
 
   /**
-    * Construct a BaseRelation using the provided context, parameters and schema.
-    *
-    * @param sqlContext SparkSQL context
-    * @param parameters parameters given to us from SparkSQL
-    * @param schema     the schema used to select columns for the relation
-    * @return           a BaseRelation Object
-    */
+   * Construct a BaseRelation using the provided context, parameters and schema.
+   *
+   * @param sqlContext SparkSQL context
+   * @param parameters parameters given to us from SparkSQL
+   * @param schema     the schema used to select columns for the relation
+   * @return           a BaseRelation Object
+   */
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String],
@@ -141,11 +142,14 @@ class DefaultSource
       parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
     val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
     val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
+    val keepAlivePeriodMs =
+      parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
 
     KuduReadOptions(
       batchSize,
       scanLocality,
       faultTolerantScanner,
+      keepAlivePeriodMs,
       scanRequestTimeoutMs,
       socketReadTimeoutMs)
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 77dabcc..2deea16 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext
 import org.apache.spark.TaskContext
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
-
 import org.apache.kudu.client._
 import org.apache.kudu.Type
 import org.apache.kudu.client
@@ -45,6 +44,9 @@ class KuduRDD private[kudu] (
     @transient val sc: SparkContext)
     extends RDD[Row](sc, Nil) {
 
+  // Defined here because the options are transient.
+  private val keepAlivePeriodMs = options.keepAlivePeriodMs
+
   override protected def getPartitions: Array[Partition] = {
     val builder = kuduContext.syncClient
       .newScanTokenBuilder(table)
@@ -91,7 +93,7 @@ class KuduRDD private[kudu] (
     val partition: KuduPartition = part.asInstanceOf[KuduPartition]
     val scanner =
       KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowIterator(scanner, kuduContext)
+    new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -112,11 +114,27 @@ private class KuduPartition(
  * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
  * @param scanner the wrapped scanner
  * @param kuduContext the kudu context
+ * @param keepAlivePeriodMs the period in which to call the keepAlive on the scanners
  */
-private class RowIterator(private val scanner: KuduScanner, private val kuduContext: KuduContext)
+private class RowIterator(
+    val scanner: KuduScanner,
+    val kuduContext: KuduContext,
+    val keepAlivePeriodMs: Long)
     extends Iterator[Row] {
 
   private var currentIterator: RowResultIterator = RowResultIterator.empty
+  private var lastKeepAliveTimeMs = System.currentTimeMillis()
+
+  /**
+   * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs has passed.
+   */
+  private def KeepKuduScannerAlive(): Unit = {
+    val now = System.currentTimeMillis
+    if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) {
+      scanner.keepAlive()
+      lastKeepAliveTimeMs = now
+    }
+  }
 
   override def hasNext: Boolean = {
     while (!currentIterator.hasNext && scanner.hasMoreRows) {
@@ -128,6 +146,7 @@ private class RowIterator(private val scanner: KuduScanner, private val
kuduCont
       // timestamp on each executor.
       kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
     }
+    KeepKuduScannerAlive()
     currentIterator.hasNext
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index 7c9b888..a1983b5 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -31,6 +31,8 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
  *                     take place at the closest replica
  * @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
  *                             otherwise, use non fault tolerant one
+ * @param keepAlivePeriodMs The period at which to send keep-alive requests to the tablet
+ *                          server to ensure that scanners do not time out
  * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
  * @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket
  */
@@ -40,6 +42,7 @@ case class KuduReadOptions(
     batchSize: Int = defaultBatchSize,
     scanLocality: ReplicaSelection = defaultScanLocality,
     faultTolerantScanner: Boolean = defaultFaultTolerantScanner,
+    keepAlivePeriodMs: Long = defaultKeepAlivePeriodMs,
     scanRequestTimeoutMs: Option[Long] = None,
     socketReadTimeoutMs: Option[Long] = None)
 
@@ -47,4 +50,5 @@ object KuduReadOptions {
   val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
   val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA
   val defaultFaultTolerantScanner: Boolean = false
+  val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl.
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
index f0fb4a0..49bc15e 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
@@ -17,6 +17,15 @@
 
 package org.apache.kudu.spark.kudu
 
+import scala.collection.JavaConverters._
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
 import org.junit.Test
 
 class KuduRDDTest extends KuduTestSuite {
@@ -27,4 +36,82 @@ class KuduRDDTest extends KuduTestSuite {
     val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
     assert(rdd.collect.length == 100)
   }
+
+  @Test
+  @TabletServerConfig(
+    // Hard coded values because Scala doesn't handle array constants in annotations.
+    flags = Array(
+      "--scanner_ttl_ms=5000",
+      "--scanner_gc_check_interval_us=500000" // 10% of the TTL.
+    ))
+  def testKeepAlive() {
+    val rowCount = 500
+    val shortScannerTtlMs = 5000
+
+    // Create a simple table with a single partition.
+    val tableName = "testKeepAlive"
+    val tableSchema = {
+      val columns = List(
+        new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+        new ColumnSchemaBuilder("val", Type.INT32).build()).asJava
+      new Schema(columns)
+    }
+    val tableOptions = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+    val table = kuduClient.createTable(tableName, tableSchema, tableOptions)
+
+    val session = kuduClient.newSession()
+    Range(0, rowCount).map { i =>
+      val insert = table.newInsert
+      val row = insert.getRow
+      row.addInt(0, i)
+      row.addInt(1, i)
+      session.apply(insert)
+    }
+    session.flush()
+
+    def processRDD(rdd: RDD[Row]): Unit = {
+      // Ensure reading takes longer than the scanner ttl.
+      var i = 0
+      rdd.foreach { row =>
+        // Sleep for half the ttl for the first few rows. This ensures
+        // we are on the same tablet and will go past the ttl without
+        // a new scan request. It also ensures a single row doesn't go
+        // longer than the ttl.
+        if (i < 5) {
+          Thread.sleep(shortScannerTtlMs / 2) // Sleep for half the ttl.
+          i = i + 1
+        }
+      }
+    }
+
+    // Test that a keepAlivePeriodMs less than the scanner ttl is successful.
+    val goodRdd = kuduContext.kuduRDD(
+      ss.sparkContext,
+      tableName,
+      List("key"),
+      KuduReadOptions(
+        batchSize = 100, // Set a small batch size so the first scan doesn't read all the
rows.
+        keepAlivePeriodMs = shortScannerTtlMs / 4)
+    )
+    processRDD(goodRdd)
+
+    // Test that a keepAlivePeriodMs greater than the scanner ttl fails.
+    val badRdd = kuduContext.kuduRDD(
+      ss.sparkContext,
+      tableName,
+      List("key"),
+      KuduReadOptions(
+        batchSize = 100, // Set a small batch size so the first scan doesn't read all the
rows.
+        keepAlivePeriodMs = shortScannerTtlMs * 2)
+    )
+    try {
+      processRDD(badRdd)
+      fail("Should throw a scanner not found exception")
+    } catch {
+      case ex: SparkException =>
+        assert(ex.getMessage.matches("(?s).*Scanner .* not found.*"))
+    }
+  }
 }


Mime
View raw message