spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
Date Wed, 09 Sep 2015 03:39:40 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 8f82bb440 -> dbff38963


[SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream and throw a better
exception when reading QueueInputDStream

Output a warning when serializing QueueInputDStream rather than throwing an exception to allow
unit tests use it. Moreover, this PR also throws an better exception when deserializing QueueInputDStream
to make the user find out the problem easily. The previous exception is hard to understand:
https://issues.apache.org/jira/browse/SPARK-8553

Author: zsxwing <zsxwing@gmail.com>

Closes #8624 from zsxwing/SPARK-10071 and squashes the following commits:

847cfa8 [zsxwing] Output a warning when writing QueueInputDStream and throw a better exception
when reading QueueInputDStream

(cherry picked from commit 820913f554bef610d07ca2dadaead657f916ae63)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbff3896
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbff3896
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbff3896

Branch: refs/heads/branch-1.4
Commit: dbff3896340cd7fae55dba0ac0024a579862fcfe
Parents: 8f82bb4
Author: zsxwing <zsxwing@gmail.com>
Authored: Tue Sep 8 20:39:15 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Sep 8 20:39:35 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala |  6 +++--
 .../streaming/dstream/QueueInputDStream.scala   |  9 +++++--
 .../spark/streaming/StreamingContextSuite.scala | 28 +++++++++++++-------
 3 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dbff3896/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index bd117ed..313dc4c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -307,7 +307,7 @@ object CheckpointReader extends Logging {
 
     // Try to read the checkpoint files in the order
     logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
-    val compressionCodec = CompressionCodec.createCodec(conf)
+    var readError: Exception = null
     checkpointFiles.foreach(file => {
       logInfo("Attempting to load checkpoint from file " + file)
       try {
@@ -318,13 +318,15 @@ object CheckpointReader extends Logging {
         return Some(cp)
       } catch {
         case e: Exception =>
+          readError = e
           logWarning("Error reading checkpoint from file " + file, e)
       }
     })
 
     // If none of checkpoint files could be read, then throw exception
     if (!ignoreReadError) {
-      throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
+      throw new SparkException(
+        s"Failed to read checkpoint from directory $checkpointPath", readError)
     }
     None
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbff3896/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index a2f5d82..bab78a3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import java.io.{NotSerializableException, ObjectOutputStream}
+import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}
 
 import scala.collection.mutable.{ArrayBuffer, Queue}
 import scala.reflect.ClassTag
@@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](
 
   override def stop() { }
 
+  private def readObject(in: ObjectInputStream): Unit = {
+    throw new NotSerializableException("queueStream doesn't support checkpointing. " +
+      "Please don't use queueStream when checkpointing is enabled.")
+  }
+
   private def writeObject(oos: ObjectOutputStream): Unit = {
-    throw new NotSerializableException("queueStream doesn't support checkpointing")
+    logWarning("queueStream doesn't support checkpointing")
   }
 
   override def compute(validTime: Time): Option[RDD[T]] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/dbff3896/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 289a159..26f40b6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.storage.StorageLevel
@@ -690,16 +690,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter
with Timeo
   }
 
   test("queueStream doesn't support checkpointing") {
-    val checkpointDir = Utils.createTempDir()
-    ssc = new StreamingContext(master, appName, batchDuration)
-    val rdd = ssc.sparkContext.parallelize(1 to 10)
-    ssc.queueStream[Int](Queue(rdd)).print()
-    ssc.checkpoint(checkpointDir.getAbsolutePath)
-    val e = intercept[NotSerializableException] {
-      ssc.start()
+    val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
+    def creatingFunction(): StreamingContext = {
+      val _ssc = new StreamingContext(conf, batchDuration)
+      val rdd = _ssc.sparkContext.parallelize(1 to 10)
+      _ssc.checkpoint(checkpointDirectory)
+      _ssc.queueStream[Int](Queue(rdd)).register()
+      _ssc
+    }
+    ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
+    ssc.start()
+    eventually(timeout(10000 millis)) {
+      assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
+    }
+    ssc.stop()
+    val e = intercept[SparkException] {
+      ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
     }
     // StreamingContext.validate changes the message, so use "contains" here
-    assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
+    assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
+      "Please don't use queueStream when checkpointing is enabled."))
   }
 
   def addInputStream(s: StreamingContext): DStream[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message