spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-25399][SS] Continuous processing state should not affect microbatch execution jobs
Date Tue, 11 Sep 2018 22:53:19 GMT
Repository: spark
Updated Branches:
  refs/heads/master 97d4afaa1 -> 9f5c5b4cc


[SPARK-25399][SS] Continuous processing state should not affect microbatch execution jobs

## What changes were proposed in this pull request?

The leftover state from running a continuous processing streaming job should not affect later
microbatch execution jobs. If a continuous processing job runs and the same thread gets reused
for a microbatch execution job in the same environment, the microbatch job could get wrong
answers because it can attempt to load the wrong version of the state.

## How was this patch tested?

New and existing unit tests

Closes #22386 from mukulmurthy/25399-streamthread.

Authored-by: Mukul Murthy <mukul.murthy@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f5c5b4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f5c5b4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f5c5b4c

Branch: refs/heads/master
Commit: 9f5c5b4cca7d4eaa30a3f8adb4cb1eebe3f77c7a
Parents: 97d4afa
Author: Mukul Murthy <mukul.murthy@gmail.com>
Authored: Tue Sep 11 15:53:15 2018 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Sep 11 15:53:15 2018 -0700

----------------------------------------------------------------------
 .../streaming/MicroBatchExecution.scala         |  2 ++
 .../execution/streaming/StreamExecution.scala   |  1 +
 .../continuous/ContinuousExecution.scala        |  2 ++
 .../streaming/state/StateStoreRDD.scala         | 12 +++++--
 .../spark/sql/streaming/StreamSuite.scala       | 33 ++++++++++++++++++--
 5 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9f5c5b4c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index b1cafd6..2cac865 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -511,6 +511,8 @@ class MicroBatchExecution(
 
     sparkSessionToRunBatch.sparkContext.setLocalProperty(
       MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString)
+    sparkSessionToRunBatch.sparkContext.setLocalProperty(
+      StreamExecution.IS_CONTINUOUS_PROCESSING, false.toString)
 
     reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(

http://git-wip-us.apache.org/repos/asf/spark/blob/9f5c5b4c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index a39bb71..f6c60c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -529,6 +529,7 @@ abstract class StreamExecution(
 
 object StreamExecution {
   val QUERY_ID_KEY = "sql.streaming.queryId"
+  val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
 
   def isInterruptionException(e: Throwable): Boolean = e match {
     // InterruptedIOException - thrown when an I/O operation is interrupted

http://git-wip-us.apache.org/repos/asf/spark/blob/9f5c5b4c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 4ddebb3..ccca726 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -210,6 +210,8 @@ class ContinuousExecution(
     }.head
 
     sparkSessionForQuery.sparkContext.setLocalProperty(
+      StreamExecution.IS_CONTINUOUS_PROCESSING, true.toString)
+    sparkSessionForQuery.sparkContext.setLocalProperty(
       ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
     // Add another random ID on top of the run ID, to distinguish epoch coordinators across
     // reconfigurations.

http://git-wip-us.apache.org/repos/asf/spark/blob/9f5c5b4c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index 3f11b8f..4a69a48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -23,6 +23,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.streaming.continuous.EpochTracker
 import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
@@ -74,9 +75,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
 
     // If we're in continuous processing mode, we should get the store version for the current
     // epoch rather than the one at planning time.
-    val currentVersion = EpochTracker.getCurrentEpoch match {
-      case None => storeVersion
-      case Some(value) => value
+    val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING))
+      .map(_.toBoolean).getOrElse(false)
+    val currentVersion = if (isContinuous) {
+      val epoch = EpochTracker.getCurrentEpoch
+      assert(epoch.isDefined, "Current epoch must be defined for continuous processing streams.")
+      epoch.get
+    } else {
+      storeVersion
     }
 
     store = StateStore.get(

http://git-wip-us.apache.org/repos/asf/spark/blob/9f5c5b4c/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index bf509b1..f55ddb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -29,13 +29,14 @@ import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, TaskContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
 import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId,
StateStoreProvider}
 import org.apache.spark.sql.functions._
@@ -788,7 +789,7 @@ class StreamSuite extends StreamTest {
     val query = input
       .toDS()
       .map { i =>
-        while (!org.apache.spark.TaskContext.get().isInterrupted()) {
+        while (!TaskContext.get().isInterrupted()) {
           // keep looping till interrupted by query.stop()
           Thread.sleep(100)
         }
@@ -1029,6 +1030,34 @@ class StreamSuite extends StreamTest {
         false))
   }
 
+  test("is_continuous_processing property should be false for microbatch processing") {
+    val input = MemoryStream[Int]
+    val df = input.toDS()
+      .map(i => TaskContext.get().getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING))
+    testStream(df) (
+      AddData(input, 1),
+      CheckAnswer("false")
+    )
+  }
+
+  test("is_continuous_processing property should be true for continuous processing") {
+    val input = ContinuousMemoryStream[Int]
+    val stream = input.toDS()
+      .map(i => TaskContext.get().getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING))
+      .writeStream.format("memory")
+      .queryName("output")
+      .trigger(Trigger.Continuous("1 seconds"))
+      .start()
+    try {
+      input.addData(1)
+      stream.processAllAvailable()
+    } finally {
+      stream.stop()
+    }
+
+    checkAnswer(spark.sql("select * from output"), Row("true"))
+  }
+
   for (e <- Seq(
     new InterruptedException,
     new InterruptedIOException,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message