spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-11740][STREAMING] Fix the race condition of two checkpoints in a batch
Date Tue, 17 Nov 2015 22:51:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 bdcbbdac6 -> e26dc9642


[SPARK-11740][STREAMING] Fix the race condition of two checkpoints in a batch

We will do checkpoint when generating a batch and completing a batch. When the processing
time of a batch is greater than the batch interval, checkpointing for completing an old batch
may run after checkpointing for generating a new batch. If this happens, checkpoint of an
old batch actually has the latest information, so we want to recovery from it. This PR will
use the latest checkpoint time as the file name, so that we can always recovery from the latest
checkpoint file.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9707 from zsxwing/fix-checkpoint.

(cherry picked from commit 928d631625297857fb6998fbeb0696917fbfd60f)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e26dc964
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e26dc964
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e26dc964

Branch: refs/heads/branch-1.5
Commit: e26dc96421d17bca6c92db67b2546839f84c1683
Parents: bdcbbda
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Tue Nov 17 14:48:29 2015 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Nov 17 14:50:48 2015 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/tests.py               |  8 +++---
 .../org/apache/spark/streaming/Checkpoint.scala | 18 ++++++++++--
 .../spark/streaming/CheckpointSuite.scala       | 30 ++++++++++++++++++--
 3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e26dc964/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 824f356..887106f 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -683,11 +683,11 @@ class CheckpointTests(unittest.TestCase):
         # Verify that getOrCreate() uses existing SparkContext
         self.ssc.stop(True, True)
         time.sleep(1)
-        sc = SparkContext(SparkConf())
+        self.sc = SparkContext(conf=SparkConf())
         self.setupCalled = False
         self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
         self.assertFalse(self.setupCalled)
-        self.assertTrue(self.ssc.sparkContext == sc)
+        self.assertTrue(self.ssc.sparkContext == self.sc)
 
         # Verify the getActiveOrCreate() recovers from checkpoint files
         self.ssc.stop(True, True)
@@ -706,11 +706,11 @@ class CheckpointTests(unittest.TestCase):
         # Verify that getActiveOrCreate() uses existing SparkContext
         self.ssc.stop(True, True)
         time.sleep(1)
-        self.sc = SparkContext(SparkConf())
+        self.sc = SparkContext(conf=SparkConf())
         self.setupCalled = False
         self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
         self.assertFalse(self.setupCalled)
-        self.assertTrue(self.ssc.sparkContext == sc)
+        self.assertTrue(self.ssc.sparkContext == self.sc)
 
         # Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
         self.ssc.stop(True, True)

http://git-wip-us.apache.org/repos/asf/spark/blob/e26dc964/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 27024ec..ab40956 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -176,16 +176,30 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
+  @volatile private var latestCheckpointTime: Time = null
+
   class CheckpointWriteHandler(
       checkpointTime: Time,
       bytes: Array[Byte],
       clearCheckpointDataLater: Boolean) extends Runnable {
     def run() {
+      if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
+        latestCheckpointTime = checkpointTime
+      }
       var attempts = 0
       val startTime = System.currentTimeMillis()
       val tempFile = new Path(checkpointDir, "temp")
-      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)
-      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)
+      // We will do checkpoint when generating a batch and completing a batch. When the processing
+      // time of a batch is greater than the batch interval, checkpointing for completing
an old
+      // batch may run after checkpointing of a new batch. If this happens, checkpoint of
an old
+      // batch actually has the latest information, so we want to recovery from it. Therefore,
we
+      // also use the latest checkpoint time as the file name, so that we can recovery from
the
+      // latest checkpoint file.
+      //
+      // Note: there is only one thread writting the checkpoint files, so we don't need to
worry
+      // about thread-safety.
+      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime)
+      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime)
 
       while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/e26dc964/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a695653..1c94fc9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import java.io.File
+import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
@@ -29,12 +29,14 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{IntWritable, Text}
 import org.apache.hadoop.mapred.TextOutputFormat
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.mockito.Mockito.mock
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.TestUtils
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
-import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
-import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
 
 /**
  * This test suites tests the checkpointing functionality of DStreams -
@@ -580,6 +582,28 @@ class CheckpointSuite extends TestSuiteBase {
   }
 
 
+  test("SPARK-11267: the race condition of two checkpoints in a batch") {
+    val jobGenerator = mock(classOf[JobGenerator])
+    val checkpointDir = Utils.createTempDir().toString
+    val checkpointWriter =
+      new CheckpointWriter(jobGenerator, conf, checkpointDir, new Configuration())
+    val bytes1 = Array.fill[Byte](10)(1)
+    new checkpointWriter.CheckpointWriteHandler(
+      Time(2000), bytes1, clearCheckpointDataLater = false).run()
+    val bytes2 = Array.fill[Byte](10)(2)
+    new checkpointWriter.CheckpointWriteHandler(
+      Time(1000), bytes2, clearCheckpointDataLater = true).run()
+    val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir).reverse.map { path
=>
+      new File(path.toUri)
+    }
+    assert(checkpointFiles.size === 2)
+    // Although bytes2 was written with an old time, it contains the latest status, so we
should
+    // try to read from it at first.
+    assert(Files.toByteArray(checkpointFiles(0)) === bytes2)
+    assert(Files.toByteArray(checkpointFiles(1)) === bytes1)
+    checkpointWriter.stop()
+  }
+
   /**
    * Tests a streaming operation under checkpointing, by restarting the operation
    * from checkpoint file and verifying whether the final output is correct.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message