kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [1/2] kudu git commit: [backup] Port keepAlive implementation to KuduBackupRDD
Date Tue, 30 Oct 2018 23:29:52 GMT
Repository: kudu
Updated Branches:
  refs/heads/master a2decad12 -> 606488495


[backup] Port keepAlive implementation to KuduBackupRDD

This copies the keepAlive implementation from
KuduRDD over to the KuduBackupRDD.

Change-Id: I10d7645781d2b18eb89034e1b4f55045b01ca0da
Reviewed-on: http://gerrit.cloudera.org:8080/11817
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7d972a8a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7d972a8a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7d972a8a

Branch: refs/heads/master
Commit: 7d972a8ae7313c8a6be414a3272f3b297b5c66db
Parents: a2decad
Author: Grant Henke <granthenke@apache.org>
Authored: Mon Oct 29 13:36:33 2018 -0500
Committer: Grant Henke <granthenke@apache.org>
Committed: Tue Oct 30 21:17:10 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/backup/KuduBackupOptions.scala  | 10 +++++++++-
 .../org/apache/kudu/backup/KuduBackupRDD.scala  | 21 ++++++++++++++++++--
 2 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7d972a8a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
index 02bfa28..c594b1a 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
@@ -33,7 +33,8 @@ case class KuduBackupOptions(
     format: String = KuduBackupOptions.DefaultFormat,
     scanBatchSize: Int = KuduBackupOptions.DefaultScanBatchSize,
     scanRequestTimeout: Long = KuduBackupOptions.DefaultScanRequestTimeout,
-    scanPrefetching: Boolean = KuduBackupOptions.DefaultScanPrefetching)
+    scanPrefetching: Boolean = KuduBackupOptions.DefaultScanPrefetching,
+    keepAlivePeriodMs: Long = KuduBackupOptions.defaultKeepAlivePeriodMs)
 
 object KuduBackupOptions {
   val DefaultFormat: String = "parquet"
@@ -42,6 +43,7 @@ object KuduBackupOptions {
     AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
   val DefaultScanPrefetching
     : Boolean = false // TODO: Add a test per KUDU-1260 and enable by default?
+  val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl.
 
   // TODO: clean up usage output.
   // TODO: timeout configurations.
@@ -83,6 +85,12 @@ object KuduBackupOptions {
         .text("An experimental flag to enable pre-fetching data.")
         .optional()
 
+      opt[Long]("keepAlivePeriodMs")
+        .action((v, o) => o.copy(keepAlivePeriodMs = v))
+        .text("Sets the period at which to send keep-alive requests to the tablet server
to ensure" +
+          " that scanners do not time out")
+        .optional()
+
       arg[String]("<table>...")
         .unbounded()
         .action((v, o) => o.copy(tables = o.tables :+ v))

http://git-wip-us.apache.org/repos/asf/kudu/blob/7d972a8a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index 740dfb9..f42b369 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -42,6 +42,9 @@ class KuduBackupRDD private[kudu] (
     @transient val sc: SparkContext)
     extends RDD[Row](sc, Nil) {
 
+  // Defined here because the options are transient.
+  private val keepAlivePeriodMs = options.keepAlivePeriodMs
+
   // TODO: Split large tablets into smaller scan tokens?
   override protected def getPartitions: Array[Partition] = {
     val client = kuduContext.syncClient
@@ -84,7 +87,7 @@ class KuduBackupRDD private[kudu] (
     // TODO: Get deletes and updates for incremental backups.
     val scanner =
       KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowIterator(scanner)
+    new RowIterator(scanner, keepAlivePeriodMs)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -103,9 +106,22 @@ private case class KuduBackupPartition(index: Int, scanToken: Array[Byte],
locat
  * that takes the job partitions and task context and expects to return an Iterator[Row].
  * This implementation facilitates that.
  */
-private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row] {
+private class RowIterator(private val scanner: KuduScanner, val keepAlivePeriodMs: Long)
+    extends Iterator[Row] {
 
   private var currentIterator: RowResultIterator = RowResultIterator.empty
+  private var lastKeepAliveTimeMs = System.currentTimeMillis()
+
+  /**
+   * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs has passed.
+   */
+  private def KeepKuduScannerAlive(): Unit = {
+    val now = System.currentTimeMillis
+    if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) {
+      scanner.keepAlive()
+      lastKeepAliveTimeMs = now
+    }
+  }
 
   override def hasNext: Boolean = {
     while (!currentIterator.hasNext && scanner.hasMoreRows) {
@@ -114,6 +130,7 @@ private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row
       }
       currentIterator = scanner.nextRows()
     }
+    KeepKuduScannerAlive()
     currentIterator.hasNext
   }
 


Mime
View raw message