spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-24351][SS] offsetLog/commitLog purge thresholdBatchId should be computed with current committed epoch but not currentBatchId in CP mode
Date Fri, 01 Jun 2018 17:47:55 GMT
Repository: spark
Updated Branches:
  refs/heads/master 98909c398 -> 6039b1323


[SPARK-24351][SS] offsetLog/commitLog purge thresholdBatchId should be computed with current
committed epoch but not currentBatchId in CP mode

## What changes were proposed in this pull request?
Compute the thresholdBatchId to purge metadata based on current committed epoch instead of
currentBatchId in CP mode to avoid cleaning all the committed metadata in some case as described
in the jira [SPARK-24351](https://issues.apache.org/jira/browse/SPARK-24351).

## How was this patch tested?
Add new unit test.

Author: Huang Tengfei <tengfei.h@gmail.com>

Closes #21400 from ivoson/branch-cp-meta.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6039b132
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6039b132
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6039b132

Branch: refs/heads/master
Commit: 6039b132304cc77ed39e4ca7813850507ae0b440
Parents: 98909c3
Author: Huang Tengfei <tengfei.h@gmail.com>
Authored: Fri Jun 1 10:47:53 2018 -0700
Committer: Shixiong Zhu <zsxwing@gmail.com>
Committed: Fri Jun 1 10:47:53 2018 -0700

----------------------------------------------------------------------
 .../continuous/ContinuousExecution.scala        | 11 +++--
 .../streaming/continuous/ContinuousSuite.scala  | 46 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6039b132/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index d16b24c..e3d0cea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -318,9 +318,14 @@ class ContinuousExecution(
       }
     }
 
-    if (minLogEntriesToMaintain < currentBatchId) {
-      offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
-      commitLog.purge(currentBatchId - minLogEntriesToMaintain)
+    // Since currentBatchId increases independently in cp mode, the current committed epoch
may
+    // be far behind currentBatchId. It is not safe to discard the metadata with thresholdBatchId
+    // computed based on currentBatchId. As minLogEntriesToMaintain is used to keep the minimum
+    // number of batches that must be retained and made recoverable, so we should keep the
+    // specified number of metadata that have been committed.
+    if (minLogEntriesToMaintain <= epoch) {
+      offsetLog.purge(epoch + 1 - minLogEntriesToMaintain)
+      commitLog.purge(epoch + 1 - minLogEntriesToMaintain)
     }
 
     awaitProgressLock.lock()

http://git-wip-us.apache.org/repos/asf/spark/blob/6039b132/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index cd1704a..4980b0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -297,3 +297,49 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
       CheckAnswerRowsContains(scala.Range(0, 25000).map(Row(_))))
   }
 }
+
+class ContinuousMetaSuite extends ContinuousSuiteBase {
+  import testImplicits._
+
+  // We need to specify spark.sql.streaming.minBatchesToRetain to do the following test.
+  override protected def createSparkSession = new TestSparkSession(
+    new SparkContext(
+      "local[10]",
+      "continuous-stream-test-sql-context",
+      sparkConf.set("spark.sql.testkey", "true")
+        .set("spark.sql.streaming.minBatchesToRetain", "2")))
+
+  test("SPARK-24351: check offsetLog/commitLog retained in the checkpoint directory") {
+    withTempDir { checkpointDir =>
+      val input = ContinuousMemoryStream[Int]
+      val df = input.toDF().mapPartitions(iter => {
+        // Sleep the task thread for 300 ms to make sure epoch processing time 3 times
+        // longer than epoch creating interval. So the gap between last committed
+        // epoch and currentBatchId grows over time.
+        Thread.sleep(300)
+        iter.map(row => row.getInt(0) * 2)
+      })
+
+      testStream(df)(
+        StartStream(trigger = Trigger.Continuous(100),
+          checkpointLocation = checkpointDir.getAbsolutePath),
+        AddData(input, 1),
+        CheckAnswer(2),
+        // Make sure epoch 2 has been committed before the following validation.
+        AwaitEpoch(2),
+        StopStream,
+        AssertOnQuery(q => {
+          q.commitLog.getLatest() match {
+            case Some((latestEpochId, _)) =>
+              val commitLogValidateResult = q.commitLog.get(latestEpochId - 1).isDefined
&&
+                q.commitLog.get(latestEpochId - 2).isEmpty
+              val offsetLogValidateResult = q.offsetLog.get(latestEpochId - 1).isDefined
&&
+                q.offsetLog.get(latestEpochId - 2).isEmpty
+              commitLogValidateResult && offsetLogValidateResult
+            case None => false
+          }
+        })
+      )
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message