KUDU-2563: [spark] Use the scanner keepAlive API
Adds scheduled keepAlive calls to the scanner in the
KuduRDD RowIterator. The period in which the calls
are made is configurable via keepAlivePeriodMs and
has a default of 15 seconds (which is 1/4 the default
scanner ttl).
This implementation is similar to the Impala integration.
It checks if a call to the keepAlive API is needed as
it processes each row. Compared to a background
thread, this has the downside of being less consistently
scheduled and susceptible to scenarios in which a single
row takes longer to process than the ttl. However,
because the scanner is not thread safe, this is the most
straightforward solution and has been proven to work.
Change-Id: Ia7f26d6ab8deb24982055d247938a11e188c35db
Reviewed-on: http://gerrit.cloudera.org:8080/11571
Reviewed-by: Grant Henke <granthenke@apache.org>
Tested-by: Grant Henke <granthenke@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf1b1f42
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf1b1f42
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf1b1f42
Branch: refs/heads/master
Commit: cf1b1f42cbcc3ee67477ddc44cd0ff5070f1caac
Parents: fb79f8f
Author: Grant Henke <granthenke@apache.org>
Authored: Sun Sep 30 22:31:52 2018 -0500
Committer: Grant Henke <granthenke@apache.org>
Committed: Fri Oct 5 02:24:04 2018 +0000
----------------------------------------------------------------------
.../apache/kudu/client/AsyncKuduScanner.java | 7 ++
.../org/apache/kudu/client/KuduScanner.java | 9 +-
.../apache/kudu/spark/kudu/DefaultSource.scala | 18 ++--
.../org/apache/kudu/spark/kudu/KuduRDD.scala | 25 +++++-
.../kudu/spark/kudu/KuduReadOptions.scala | 4 +
.../apache/kudu/spark/kudu/KuduRDDTest.scala | 87 ++++++++++++++++++++
6 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 804978e..71b1146 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -605,6 +605,13 @@ public final class AsyncKuduScanner {
}
/**
+ * @return true if the scanner has been closed.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ /**
* Closes this scanner (don't forget to call this when you're done with it!).
* <p>
* Closing a scanner already closed has no effect. The deferred returned
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 209fada..f945d8f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -79,6 +79,13 @@ public class KuduScanner {
}
/**
+ * @return true if the scanner has been closed.
+ */
+ public boolean isClosed() {
+ return asyncScanner.isClosed();
+ }
+
+ /**
* Closes this scanner (don't forget to call this when you're done with it!).
* <p>
* Closing a scanner already closed has no effect.
@@ -135,7 +142,7 @@ public class KuduScanner {
* Returns the RemoteTablet currently being scanned, if any.
*/
@InterfaceAudience.LimitedPrivate("Test")
- RemoteTablet currentTablet() {
+ public RemoteTablet currentTablet() {
return asyncScanner.currentTablet();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 29635a3..890ecda 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -62,6 +62,7 @@ class DefaultSource
val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
val BATCH_SIZE = "kudu.batchSize"
+ val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
/**
* Construct a BaseRelation using the provided context and parameters.
@@ -77,13 +78,13 @@ class DefaultSource
}
/**
- * Construct a BaseRelation using the provided context, parameters and schema.
- *
- * @param sqlContext SparkSQL context
- * @param parameters parameters given to us from SparkSQL
- * @param schema the schema used to select columns for the relation
- * @return a BaseRelation Object
- */
+ * Construct a BaseRelation using the provided context, parameters and schema.
+ *
+ * @param sqlContext SparkSQL context
+ * @param parameters parameters given to us from SparkSQL
+ * @param schema the schema used to select columns for the relation
+ * @return a BaseRelation Object
+ */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
@@ -141,11 +142,14 @@ class DefaultSource
parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
+ val keepAlivePeriodMs =
+ parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
KuduReadOptions(
batchSize,
scanLocality,
faultTolerantScanner,
+ keepAlivePeriodMs,
scanRequestTimeoutMs,
socketReadTimeoutMs)
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 77dabcc..2deea16 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
-
import org.apache.kudu.client._
import org.apache.kudu.Type
import org.apache.kudu.client
@@ -45,6 +44,9 @@ class KuduRDD private[kudu] (
@transient val sc: SparkContext)
extends RDD[Row](sc, Nil) {
+ // Defined here because the options are transient.
+ private val keepAlivePeriodMs = options.keepAlivePeriodMs
+
override protected def getPartitions: Array[Partition] = {
val builder = kuduContext.syncClient
.newScanTokenBuilder(table)
@@ -91,7 +93,7 @@ class KuduRDD private[kudu] (
val partition: KuduPartition = part.asInstanceOf[KuduPartition]
val scanner =
KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
- new RowIterator(scanner, kuduContext)
+ new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -112,11 +114,27 @@ private class KuduPartition(
* A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
* @param scanner the wrapped scanner
* @param kuduContext the kudu context
+ * @param keepAlivePeriodMs the period in which to call the keepAlive on the scanners
*/
-private class RowIterator(private val scanner: KuduScanner, private val kuduContext: KuduContext)
+private class RowIterator(
+ val scanner: KuduScanner,
+ val kuduContext: KuduContext,
+ val keepAlivePeriodMs: Long)
extends Iterator[Row] {
private var currentIterator: RowResultIterator = RowResultIterator.empty
+ private var lastKeepAliveTimeMs = System.currentTimeMillis()
+
+ /**
+ * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs has passed.
+ */
+ private def KeepKuduScannerAlive(): Unit = {
+ val now = System.currentTimeMillis
+ if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) {
+ scanner.keepAlive()
+ lastKeepAliveTimeMs = now
+ }
+ }
override def hasNext: Boolean = {
while (!currentIterator.hasNext && scanner.hasMoreRows) {
@@ -128,6 +146,7 @@ private class RowIterator(private val scanner: KuduScanner, private val
kuduCont
// timestamp on each executor.
kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
}
+ KeepKuduScannerAlive()
currentIterator.hasNext
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index 7c9b888..a1983b5 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -31,6 +31,8 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
* take place at the closest replica
* @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
* otherwise, use non fault tolerant one
+ * @param keepAlivePeriodMs The period at which to send keep-alive requests to the tablet
+ * server to ensure that scanners do not time out
* @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
* @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket
*/
@@ -40,6 +42,7 @@ case class KuduReadOptions(
batchSize: Int = defaultBatchSize,
scanLocality: ReplicaSelection = defaultScanLocality,
faultTolerantScanner: Boolean = defaultFaultTolerantScanner,
+ keepAlivePeriodMs: Long = defaultKeepAlivePeriodMs,
scanRequestTimeoutMs: Option[Long] = None,
socketReadTimeoutMs: Option[Long] = None)
@@ -47,4 +50,5 @@ object KuduReadOptions {
val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA
val defaultFaultTolerantScanner: Boolean = false
+ val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl.
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cf1b1f42/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
index f0fb4a0..49bc15e 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
@@ -17,6 +17,15 @@
package org.apache.kudu.spark.kudu
+import scala.collection.JavaConverters._
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
import org.junit.Test
class KuduRDDTest extends KuduTestSuite {
@@ -27,4 +36,82 @@ class KuduRDDTest extends KuduTestSuite {
val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
assert(rdd.collect.length == 100)
}
+
+ @Test
+ @TabletServerConfig(
+ // Hard coded values because Scala doesn't handle array constants in annotations.
+ flags = Array(
+ "--scanner_ttl_ms=5000",
+ "--scanner_gc_check_interval_us=500000" // 10% of the TTL.
+ ))
+ def testKeepAlive() {
+ val rowCount = 500
+ val shortScannerTtlMs = 5000
+
+ // Create a simple table with a single partition.
+ val tableName = "testKeepAlive"
+ val tableSchema = {
+ val columns = List(
+ new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+ new ColumnSchemaBuilder("val", Type.INT32).build()).asJava
+ new Schema(columns)
+ }
+ val tableOptions = new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
+ .setNumReplicas(1)
+ val table = kuduClient.createTable(tableName, tableSchema, tableOptions)
+
+ val session = kuduClient.newSession()
+ Range(0, rowCount).map { i =>
+ val insert = table.newInsert
+ val row = insert.getRow
+ row.addInt(0, i)
+ row.addInt(1, i)
+ session.apply(insert)
+ }
+ session.flush()
+
+ def processRDD(rdd: RDD[Row]): Unit = {
+ // Ensure reading takes longer than the scanner ttl.
+ var i = 0
+ rdd.foreach { row =>
+ // Sleep for half the ttl for the first few rows. This ensures
+ // we are on the same tablet and will go past the ttl without
+ // a new scan request. It also ensures a single row doesn't go
+ // longer than the ttl.
+ if (i < 5) {
+ Thread.sleep(shortScannerTtlMs / 2) // Sleep for half the ttl.
+ i = i + 1
+ }
+ }
+ }
+
+ // Test that a keepAlivePeriodMs less than the scanner ttl is successful.
+ val goodRdd = kuduContext.kuduRDD(
+ ss.sparkContext,
+ tableName,
+ List("key"),
+ KuduReadOptions(
+ batchSize = 100, // Set a small batch size so the first scan doesn't read all the
rows.
+ keepAlivePeriodMs = shortScannerTtlMs / 4)
+ )
+ processRDD(goodRdd)
+
+ // Test that a keepAlivePeriodMs greater than the scanner ttl fails.
+ val badRdd = kuduContext.kuduRDD(
+ ss.sparkContext,
+ tableName,
+ List("key"),
+ KuduReadOptions(
+ batchSize = 100, // Set a small batch size so the first scan doesn't read all the
rows.
+ keepAlivePeriodMs = shortScannerTtlMs * 2)
+ )
+ try {
+ processRDD(badRdd)
+ fail("Should throw a scanner not found exception")
+ } catch {
+ case ex: SparkException =>
+ assert(ex.getMessage.matches("(?s).*Scanner .* not found.*"))
+ }
+ }
}
|