spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-18337] Complete mode memory sinks should be able to recover from checkpoints
Date Tue, 15 Nov 2016 21:09:44 GMT
Repository: spark
Updated Branches:
  refs/heads/master 6f9e598cc -> 2afdaa980


[SPARK-18337] Complete mode memory sinks should be able to recover from checkpoints

## What changes were proposed in this pull request?

It would be nice if memory sinks can also recover from checkpoints. For correctness reasons,
the only time we should support it is in `Complete` OutputMode. We can support this in CompleteMode,
because the output of the StateStore is already persisted in the checkpoint directory.

## How was this patch tested?

Unit test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15801 from brkyvz/mem-stream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2afdaa98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2afdaa98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2afdaa98

Branch: refs/heads/master
Commit: 2afdaa9805f44b45242978eab9a9623d31dddbf3
Parents: 6f9e598
Author: Burak Yavuz <brkyvz@gmail.com>
Authored: Tue Nov 15 13:09:29 2016 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Nov 15 13:09:29 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamWriter.scala  |  6 +-
 .../test/DataStreamReaderWriterSuite.scala      | 65 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2afdaa98/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b959444..daed1dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -222,14 +222,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
+      val chkpointLoc = extraOptions.get("checkpointLocation")
+      val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete()
       val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
-        extraOptions.get("checkpointLocation"),
+        chkpointLoc,
         df,
         sink,
         outputMode,
         useTempCheckpointLocation = true,
-        recoverFromCheckpointLocation = false,
+        recoverFromCheckpointLocation = recoverFromChkpoint,
         trigger = trigger)
       resultDf.createOrReplaceTempView(query.name)
       query

http://git-wip-us.apache.org/repos/asf/spark/blob/2afdaa98/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index f099439..5630464 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.streaming.test
 
+import java.io.File
 import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
@@ -467,4 +468,68 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter
{
     val sq = df.writeStream.format("console").start()
     sq.stop()
   }
+
+  test("MemorySink can recover from a checkpoint in Complete Mode") {
+    import testImplicits._
+    val ms = new MemoryStream[Int](0, sqlContext)
+    val df = ms.toDF().toDF("a")
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+    val tableName = "test"
+    def startQuery: StreamingQuery = {
+      df.groupBy("a")
+        .count()
+        .writeStream
+        .format("memory")
+        .queryName(tableName)
+        .option("checkpointLocation", checkpointLoc)
+        .outputMode("complete")
+        .start()
+    }
+    // no exception here
+    val q = startQuery
+    ms.addData(0, 1)
+    q.processAllAvailable()
+    q.stop()
+
+    checkAnswer(
+      spark.table(tableName),
+      Seq(Row(0, 1), Row(1, 1))
+    )
+    spark.sql(s"drop table $tableName")
+    // verify table is dropped
+    intercept[AnalysisException](spark.table(tableName).collect())
+    val q2 = startQuery
+    ms.addData(0)
+    q2.processAllAvailable()
+    checkAnswer(
+      spark.table(tableName),
+      Seq(Row(0, 2), Row(1, 1))
+    )
+
+    q2.stop()
+  }
+
+  test("append mode memory sink's do not support checkpoint recovery") {
+    import testImplicits._
+    val ms = new MemoryStream[Int](0, sqlContext)
+    val df = ms.toDF().toDF("a")
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+
+    val e = intercept[AnalysisException] {
+      df.writeStream
+        .format("memory")
+        .queryName("test")
+        .option("checkpointLocation", checkpointLoc)
+        .outputMode("append")
+        .start()
+    }
+    assert(e.getMessage.contains("does not support recovering"))
+    assert(e.getMessage.contains("checkpoint location"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message