spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject [1/2] spark git commit: [SPARK-15686][SQL] Move user-facing streaming classes into sql.streaming
Date Wed, 01 Jun 2016 17:15:02 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9406a3c9a -> a780848af


http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala
deleted file mode 100644
index ba1facf..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.ContinuousQuery
-import org.apache.spark.sql.util.ContinuousQueryListener._
-
-/**
- * :: Experimental ::
- * Interface for listening to events related to [[ContinuousQuery ContinuousQueries]].
- * @note The methods are not thread-safe as they may be called from different threads.
- */
-@Experimental
-abstract class ContinuousQueryListener {
-
-  /**
-   * Called when a query is started.
-   * @note This is called synchronously with
-   *       [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]],
-   *       that is, `onQueryStart` will be called on all listeners before
-   *       `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please
-   *       don't block this method as it will block your query.
-   */
-  def onQueryStarted(queryStarted: QueryStarted): Unit
-
-  /**
-   * Called when there is some status update (ingestion rate updated, etc.)
-   *
-   * @note This method is asynchronous. The status in [[ContinuousQuery]] will always be
-   *       latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]]
-   *       may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]]
-   *       is terminated when you are processing [[QueryProgress]].
-   */
-  def onQueryProgress(queryProgress: QueryProgress): Unit
-
-  /** Called when a query is stopped, with or without error */
-  def onQueryTerminated(queryTerminated: QueryTerminated): Unit
-}
-
-
-/**
- * :: Experimental ::
- * Companion object of [[ContinuousQueryListener]] that defines the listener events.
- */
-@Experimental
-object ContinuousQueryListener {
-
-  /** Base type of [[ContinuousQueryListener]] events */
-  trait Event
-
-  /** Event representing the start of a query */
-  class QueryStarted private[sql](val query: ContinuousQuery) extends Event
-
-  /** Event representing any progress updates in a query */
-  class QueryProgress private[sql](val query: ContinuousQuery) extends Event
-
-  /** Event representing that termination of a query */
-  class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
index 0d18a64..52c2007 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import scala.concurrent.duration._
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.ProcessingTime
 
 class ProcessingTimeSuite extends SparkFunSuite {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
deleted file mode 100644
index b033725..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.lang.Thread.UncaughtExceptionHandler
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.language.experimental.macros
-import scala.reflect.ClassTag
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import org.scalatest.Assertions
-import org.scalatest.concurrent.{Eventually, Timeouts}
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.exceptions.TestFailedDueToTimeoutException
-import org.scalatest.time.Span
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
-
-/**
- * A framework for implementing tests for streaming queries and sources.
- *
- * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
- * blocking as necessary to let the stream catch up.  For example, the following adds some data to
- * a stream, blocking until it can verify that the correct values are eventually produced.
- *
- * {{{
- *  val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map(_ + 1)
-
-    testStream(mapped)(
-      AddData(inputData, 1, 2, 3),
-      CheckAnswer(2, 3, 4))
- * }}}
- *
- * Note that while we do sleep to allow the other thread to progress without spinning,
- * `StreamAction` checks should not depend on the amount of time spent sleeping.  Instead they
- * should check the actual progress of the stream before verifying the required test condition.
- *
- * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
- * avoid hanging forever in the case of failures. However, individual suites can change this
- * by overriding `streamingTimeout`.
- */
-trait StreamTest extends QueryTest with Timeouts {
-
-  /** How long to wait for an active stream to catch up when checking a result. */
-  val streamingTimeout = 10.seconds
-
-  /** A trait for actions that can be performed while testing a streaming DataFrame. */
-  trait StreamAction
-
-  /** A trait to mark actions that require the stream to be actively running. */
-  trait StreamMustBeRunning
-
-  /**
-   * Adds the given data to the stream. Subsequent check answers will block until this data has
-   * been processed.
-   */
-  object AddData {
-    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
-      AddDataMemory(source, data)
-  }
-
-  /** A trait that can be extended when testing a source. */
-  trait AddData extends StreamAction {
-    /**
-     * Called to adding the data to a source. It should find the source to add data to from
-     * the active query, and then return the source object the data was added, as well as the
-     * offset of added data.
-     */
-    def addData(query: Option[StreamExecution]): (Source, Offset)
-  }
-
-  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
-    override def toString: String = s"AddData to $source: ${data.mkString(",")}"
-
-    override def addData(query: Option[StreamExecution]): (Source, Offset) = {
-      (source, source.addData(data))
-    }
-  }
-
-  /**
-   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
-   * This operation automatically blocks until all added data has been processed.
-   */
-  object CheckAnswer {
-    def apply[A : Encoder](data: A*): CheckAnswerRows = {
-      val encoder = encoderFor[A]
-      val toExternalRow = RowEncoder(encoder.schema)
-      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false)
-    }
-
-    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false)
-  }
-
-  /**
-   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
-   * This operation automatically blocks until all added data has been processed.
-   */
-  object CheckLastBatch {
-    def apply[A : Encoder](data: A*): CheckAnswerRows = {
-      val encoder = encoderFor[A]
-      val toExternalRow = RowEncoder(encoder.schema)
-      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true)
-    }
-
-    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true)
-  }
-
-  case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean)
-      extends StreamAction with StreamMustBeRunning {
-    override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
-    private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
-  }
-
-  /** Stops the stream. It must currently be running. */
-  case object StopStream extends StreamAction with StreamMustBeRunning
-
-  /** Starts the stream, resuming if data has already been processed. It must not be running. */
-  case class StartStream(
-      trigger: Trigger = ProcessingTime(0),
-      triggerClock: Clock = new SystemClock)
-    extends StreamAction
-
-  /** Advance the trigger clock's time manually. */
-  case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
-
-  /** Signals that a failure is expected and should not kill the test. */
-  case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
-    val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
-    override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]"
-  }
-
-  /** Assert that a body is true */
-  class Assert(condition: => Boolean, val message: String = "") extends StreamAction {
-    def run(): Unit = { Assertions.assert(condition) }
-    override def toString: String = s"Assert(<condition>, $message)"
-  }
-
-  object Assert {
-    def apply(condition: => Boolean, message: String = ""): Assert = new Assert(condition, message)
-    def apply(message: String)(body: => Unit): Assert = new Assert( { body; true }, message)
-    def apply(body: => Unit): Assert = new Assert( { body; true }, "")
-  }
-
-  /** Assert that a condition on the active query is true */
-  class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String)
-    extends StreamAction {
-    override def toString: String = s"AssertOnQuery(<condition>, $message)"
-  }
-
-  object AssertOnQuery {
-    def apply(condition: StreamExecution => Boolean, message: String = ""): AssertOnQuery = {
-      new AssertOnQuery(condition, message)
-    }
-
-    def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = {
-      new AssertOnQuery(condition, message)
-    }
-  }
-
-  /**
-   * Executes the specified actions on the given streaming DataFrame and provides helpful
-   * error messages in the case of failures or incorrect answers.
-   *
-   * Note that if the stream is not explicitly started before an action that requires it to be
-   * running then it will be automatically started before performing any other actions.
-   */
-  def testStream(
-      _stream: Dataset[_],
-      outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = {
-
-    val stream = _stream.toDF()
-    var pos = 0
-    var currentPlan: LogicalPlan = stream.logicalPlan
-    var currentStream: StreamExecution = null
-    var lastStream: StreamExecution = null
-    val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
-    val sink = new MemorySink(stream.schema, outputMode)
-
-    @volatile
-    var streamDeathCause: Throwable = null
-
-    // If the test doesn't manually start the stream, we do it automatically at the beginning.
-    val startedManually =
-      actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream])
-    val startedTest = if (startedManually) actions else StartStream() +: actions
-
-    def testActions = actions.zipWithIndex.map {
-      case (a, i) =>
-        if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) {
-          "=> " + a.toString
-        } else {
-          "   " + a.toString
-        }
-    }.mkString("\n")
-
-    def currentOffsets =
-      if (currentStream != null) currentStream.committedOffsets.toString else "not started"
-
-    def threadState =
-      if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
-
-    def testState =
-      s"""
-         |== Progress ==
-         |$testActions
-         |
-         |== Stream ==
-         |Output Mode: $outputMode
-         |Stream state: $currentOffsets
-         |Thread state: $threadState
-         |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
-         |
-         |== Sink ==
-         |${sink.toDebugString}
-         |
-         |
-         |== Plan ==
-         |${if (currentStream != null) currentStream.lastExecution else ""}
-         """.stripMargin
-
-    def verify(condition: => Boolean, message: String): Unit = {
-      if (!condition) {
-        failTest(message)
-      }
-    }
-
-    def eventually[T](message: String)(func: => T): T = {
-      try {
-        Eventually.eventually(Timeout(streamingTimeout)) {
-          func
-        }
-      } catch {
-        case NonFatal(e) =>
-          failTest(message, e)
-      }
-    }
-
-    def failTest(message: String, cause: Throwable = null) = {
-
-      // Recursively pretty print a exception with truncated stacktrace and internal cause
-      def exceptionToString(e: Throwable, prefix: String = ""): String = {
-        val base = s"$prefix${e.getMessage}" +
-          e.getStackTrace.take(10).mkString(s"\n$prefix", s"\n$prefix\t", "\n")
-        if (e.getCause != null) {
-          base + s"\n$prefix\tCaused by: " + exceptionToString(e.getCause, s"$prefix\t")
-        } else {
-          base
-        }
-      }
-      val c = Option(cause).map(exceptionToString(_))
-      val m = if (message != null && message.size > 0) Some(message) else None
-      fail(
-        s"""
-           |${(m ++ c).mkString(": ")}
-           |$testState
-         """.stripMargin)
-    }
-
-    val testThread = Thread.currentThread()
-    val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
-
-    try {
-      startedTest.foreach { action =>
-        action match {
-          case StartStream(trigger, triggerClock) =>
-            verify(currentStream == null, "stream already running")
-            lastStream = currentStream
-            currentStream =
-              spark
-                .streams
-                .startQuery(
-                  StreamExecution.nextName,
-                  metadataRoot,
-                  stream,
-                  sink,
-                  outputMode,
-                  trigger,
-                  triggerClock)
-                .asInstanceOf[StreamExecution]
-            currentStream.microBatchThread.setUncaughtExceptionHandler(
-              new UncaughtExceptionHandler {
-                override def uncaughtException(t: Thread, e: Throwable): Unit = {
-                  streamDeathCause = e
-                  testThread.interrupt()
-                }
-              })
-
-          case AdvanceManualClock(timeToAdd) =>
-            verify(currentStream != null,
-                   "can not advance manual clock when a stream is not running")
-            verify(currentStream.triggerClock.isInstanceOf[ManualClock],
-                   s"can not advance clock of type ${currentStream.triggerClock.getClass}")
-            currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
-
-          case StopStream =>
-            verify(currentStream != null, "can not stop a stream that is not running")
-            try failAfter(streamingTimeout) {
-              currentStream.stop()
-              verify(!currentStream.microBatchThread.isAlive,
-                s"microbatch thread not stopped")
-              verify(!currentStream.isActive,
-                "query.isActive() is false even after stopping")
-              verify(currentStream.exception.isEmpty,
-                s"query.exception() is not empty after clean stop: " +
-                  currentStream.exception.map(_.toString()).getOrElse(""))
-            } catch {
-              case _: InterruptedException =>
-              case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
-                failTest("Timed out while stopping and waiting for microbatchthread to terminate.")
-              case t: Throwable =>
-                failTest("Error while stopping stream", t)
-            } finally {
-              lastStream = currentStream
-              currentStream = null
-            }
-
-          case ef: ExpectFailure[_] =>
-            verify(currentStream != null, "can not expect failure when stream is not running")
-            try failAfter(streamingTimeout) {
-              val thrownException = intercept[ContinuousQueryException] {
-                currentStream.awaitTermination()
-              }
-              eventually("microbatch thread not stopped after termination with failure") {
-                assert(!currentStream.microBatchThread.isAlive)
-              }
-              verify(thrownException.query.eq(currentStream),
-                s"incorrect query reference in exception")
-              verify(currentStream.exception === Some(thrownException),
-                s"incorrect exception returned by query.exception()")
-
-              val exception = currentStream.exception.get
-              verify(exception.cause.getClass === ef.causeClass,
-                "incorrect cause in exception returned by query.exception()\n" +
-                  s"\tExpected: ${ef.causeClass}\n\tReturned: ${exception.cause.getClass}")
-            } catch {
-              case _: InterruptedException =>
-              case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
-                failTest("Timed out while waiting for failure")
-              case t: Throwable =>
-                failTest("Error while checking stream failure", t)
-            } finally {
-              lastStream = currentStream
-              currentStream = null
-              streamDeathCause = null
-            }
-
-          case a: AssertOnQuery =>
-            verify(currentStream != null || lastStream != null,
-              "cannot assert when not stream has been started")
-            val streamToAssert = Option(currentStream).getOrElse(lastStream)
-            verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}")
-
-          case a: Assert =>
-            val streamToAssert = Option(currentStream).getOrElse(lastStream)
-            verify({ a.run(); true }, s"Assert failed: ${a.message}")
-
-          case a: AddData =>
-            try {
-              // Add data and get the source where it was added, and the expected offset of the
-              // added data.
-              val queryToUse = Option(currentStream).orElse(Option(lastStream))
-              val (source, offset) = a.addData(queryToUse)
-
-              def findSourceIndex(plan: LogicalPlan): Option[Int] = {
-                plan
-                  .collect { case StreamingExecutionRelation(s, _) => s }
-                  .zipWithIndex
-                  .find(_._1 == source)
-                  .map(_._2)
-              }
-
-              // Try to find the index of the source to which data was added. Either get the index
-              // from the current active query or the original input logical plan.
-              val sourceIndex =
-                queryToUse.flatMap { query =>
-                  findSourceIndex(query.logicalPlan)
-                }.orElse {
-                  findSourceIndex(stream.logicalPlan)
-                }.getOrElse {
-                  throw new IllegalArgumentException(
-                    "Could find index of the source to which data was added")
-                }
-
-              // Store the expected offset of added data to wait for it later
-              awaiting.put(sourceIndex, offset)
-            } catch {
-              case NonFatal(e) =>
-                failTest("Error adding data", e)
-            }
-
-          case CheckAnswerRows(expectedAnswer, lastOnly) =>
-            verify(currentStream != null, "stream not running")
-            // Get the map of source index to the current source objects
-            val indexToSource = currentStream
-              .logicalPlan
-              .collect { case StreamingExecutionRelation(s, _) => s }
-              .zipWithIndex
-              .map(_.swap)
-              .toMap
-
-            // Block until all data added has been processed for all the source
-            awaiting.foreach { case (sourceIndex, offset) =>
-              failAfter(streamingTimeout) {
-                currentStream.awaitOffset(indexToSource(sourceIndex), offset)
-              }
-            }
-
-            val sparkAnswer = try if (lastOnly) sink.latestBatchData else sink.allData catch {
-              case e: Exception =>
-                failTest("Exception while getting data from sink", e)
-            }
-
-            QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
-              error => failTest(error)
-            }
-        }
-        pos += 1
-      }
-    } catch {
-      case _: InterruptedException if streamDeathCause != null =>
-        failTest("Stream Thread Died")
-      case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
-        failTest("Timed out waiting for stream")
-    } finally {
-      if (currentStream != null && currentStream.microBatchThread.isAlive) {
-        currentStream.stop()
-      }
-    }
-  }
-
-  /**
-   * Creates a stress test that randomly starts/stops/adds data/checks the result.
-   *
-   * @param ds a dataframe that executes + 1 on a stream of integers, returning the result.
-   * @param addData and add data action that adds the given numbers to the stream, encoding them
-   *                as needed
-   */
-  def runStressTest(
-      ds: Dataset[Int],
-      addData: Seq[Int] => StreamAction,
-      iterations: Int = 100): Unit = {
-    implicit val intEncoder = ExpressionEncoder[Int]()
-    var dataPos = 0
-    var running = true
-    val actions = new ArrayBuffer[StreamAction]()
-
-    def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) }
-
-    def addRandomData() = {
-      val numItems = Random.nextInt(10)
-      val data = dataPos until (dataPos + numItems)
-      dataPos += numItems
-      actions += addData(data)
-    }
-
-    (1 to iterations).foreach { i =>
-      val rand = Random.nextDouble()
-      if(!running) {
-        rand match {
-          case r if r < 0.7 => // AddData
-            addRandomData()
-
-          case _ => // StartStream
-            actions += StartStream()
-            running = true
-        }
-      } else {
-        rand match {
-          case r if r < 0.1 =>
-            addCheck()
-
-          case r if r < 0.7 => // AddData
-            addRandomData()
-
-          case _ => // StopStream
-            addCheck()
-            actions += StopStream
-            running = false
-        }
-      }
-    }
-    if(!running) { actions += StartStream() }
-    addCheck()
-    testStream(ds)(actions: _*)
-  }
-
-
-  object AwaitTerminationTester {
-
-    trait ExpectedBehavior
-
-    /** Expect awaitTermination to not be blocked */
-    case object ExpectNotBlocked extends ExpectedBehavior
-
-    /** Expect awaitTermination to get blocked */
-    case object ExpectBlocked extends ExpectedBehavior
-
-    /** Expect awaitTermination to throw an exception */
-    case class ExpectException[E <: Exception]()(implicit val t: ClassTag[E])
-      extends ExpectedBehavior
-
-    private val DEFAULT_TEST_TIMEOUT = 1 second
-
-    def test(
-        expectedBehavior: ExpectedBehavior,
-        awaitTermFunc: () => Unit,
-        testTimeout: Span = DEFAULT_TEST_TIMEOUT
-      ): Unit = {
-
-      expectedBehavior match {
-        case ExpectNotBlocked =>
-          withClue("Got blocked when expected non-blocking.") {
-            failAfter(testTimeout) {
-              awaitTermFunc()
-            }
-          }
-
-        case ExpectBlocked =>
-          withClue("Was not blocked when expected.") {
-            intercept[TestFailedDueToTimeoutException] {
-              failAfter(testTimeout) {
-                awaitTermFunc()
-              }
-            }
-          }
-
-        case e: ExpectException[_] =>
-          val thrownException =
-            withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") {
-              intercept[ContinuousQueryException] {
-                failAfter(testTimeout) {
-                  awaitTermFunc()
-                }
-              }
-            }
-          assert(thrownException.cause.getClass === e.t.runtimeClass,
-            "exception of incorrect type was throw")
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
index 7f99d30..00d5e05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.ProcessingTime
+import org.apache.spark.sql.streaming.ProcessingTime
 import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 class ProcessingTimeExecutorSuite extends SparkFunSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
new file mode 100644
index 0000000..cdd97da
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.PrivateMethodTester._
+import org.scalatest.concurrent.AsyncAssertions.Waiter
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.execution.streaming._
+
+
+class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+  import ContinuousQueryListener._
+
+  after {
+    spark.streams.active.foreach(_.stop())
+    assert(spark.streams.active.isEmpty)
+    assert(addedListeners.isEmpty)
+    // Make sure we don't leak any events to the next test
+    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+  }
+
+  test("single listener") {
+    val listener = new QueryStatusCollector
+    val input = MemoryStream[Int]
+    withListenerAdded(listener) {
+      testStream(input.toDS)(
+        StartStream(),
+        Assert("Incorrect query status in onQueryStarted") {
+          val status = listener.startStatus
+          assert(status != null)
+          assert(status.active == true)
+          assert(status.sourceStatuses.size === 1)
+          assert(status.sourceStatuses(0).description.contains("Memory"))
+
+          // The source and sink offsets must be None as this must be called before the
+          // batches have started
+          assert(status.sourceStatuses(0).offset === None)
+          assert(status.sinkStatus.offset === CompositeOffset(None :: Nil))
+
+          // No progress events or termination events
+          assert(listener.progressStatuses.isEmpty)
+          assert(listener.terminationStatus === null)
+        },
+        AddDataMemory(input, Seq(1, 2, 3)),
+        CheckAnswer(1, 2, 3),
+        Assert("Incorrect query status in onQueryProgress") {
+          eventually(Timeout(streamingTimeout)) {
+
+            // There should be only on progress event as batch has been processed
+            assert(listener.progressStatuses.size === 1)
+            val status = listener.progressStatuses.peek()
+            assert(status != null)
+            assert(status.active == true)
+            assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
+            assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
+
+            // No termination events
+            assert(listener.terminationStatus === null)
+          }
+        },
+        StopStream,
+        Assert("Incorrect query status in onQueryTerminated") {
+          eventually(Timeout(streamingTimeout)) {
+            val status = listener.terminationStatus
+            assert(status != null)
+
+            assert(status.active === false) // must be inactive by the time onQueryTerm is called
+            assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
+            assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
+          }
+          listener.checkAsyncErrors()
+        }
+      )
+    }
+  }
+
+  test("adding and removing listener") {
+    def isListenerActive(listener: QueryStatusCollector): Boolean = {
+      listener.reset()
+      testStream(MemoryStream[Int].toDS)(
+        StartStream(),
+        StopStream
+      )
+      listener.startStatus != null
+    }
+
+    try {
+      val listener1 = new QueryStatusCollector
+      val listener2 = new QueryStatusCollector
+
+      spark.streams.addListener(listener1)
+      assert(isListenerActive(listener1) === true)
+      assert(isListenerActive(listener2) === false)
+      spark.streams.addListener(listener2)
+      assert(isListenerActive(listener1) === true)
+      assert(isListenerActive(listener2) === true)
+      spark.streams.removeListener(listener1)
+      assert(isListenerActive(listener1) === false)
+      assert(isListenerActive(listener2) === true)
+    } finally {
+      addedListeners.foreach(spark.streams.removeListener)
+    }
+  }
+
+  test("event ordering") {
+    val listener = new QueryStatusCollector
+    withListenerAdded(listener) {
+      for (i <- 1 to 100) {
+        listener.reset()
+        require(listener.startStatus === null)
+        testStream(MemoryStream[Int].toDS)(
+          StartStream(),
+          Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
+          StopStream,
+          Assert { listener.checkAsyncErrors() }
+        )
+      }
+    }
+  }
+
+
+  private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = {
+    try {
+      failAfter(1 minute) {
+        spark.streams.addListener(listener)
+        body
+      }
+    } finally {
+      spark.streams.removeListener(listener)
+    }
+  }
+
+  private def addedListeners(): Array[ContinuousQueryListener] = {
+    val listenerBusMethod =
+      PrivateMethod[ContinuousQueryListenerBus]('listenerBus)
+    val listenerBus = spark.streams invokePrivate listenerBusMethod()
+    listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener])
+  }
+
+  class QueryStatusCollector extends ContinuousQueryListener {
+    // to catch errors in the async listener events
+    @volatile private var asyncTestWaiter = new Waiter
+
+    @volatile var startStatus: QueryStatus = null
+    @volatile var terminationStatus: QueryStatus = null
+    val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
+
+    def reset(): Unit = {
+      startStatus = null
+      terminationStatus = null
+      progressStatuses.clear()
+      asyncTestWaiter = new Waiter
+    }
+
+    def checkAsyncErrors(): Unit = {
+      asyncTestWaiter.await(timeout(streamingTimeout))
+    }
+
+
+    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
+      asyncTestWaiter {
+        startStatus = QueryStatus(queryStarted.query)
+      }
+    }
+
+    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+      asyncTestWaiter {
+        assert(startStatus != null, "onQueryProgress called before onQueryStarted")
+        progressStatuses.add(QueryStatus(queryProgress.query))
+      }
+    }
+
+    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
+      asyncTestWaiter {
+        assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
+        terminationStatus = QueryStatus(queryTerminated.query)
+      }
+      asyncTestWaiter.dismiss()
+    }
+  }
+
+  case class QueryStatus(
+    active: Boolean,
+    exception: Option[Exception],
+    sourceStatuses: Array[SourceStatus],
+    sinkStatus: SinkStatus)
+
+  object QueryStatus {
+    def apply(query: ContinuousQuery): QueryStatus = {
+      QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index b75c3ea..c1e4970 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -28,12 +28,11 @@ import org.scalatest.time.Span
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{ContinuousQuery, Dataset, OutputMode, StreamTest}
+import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
-class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
 
   import AwaitTerminationTester._
   import testImplicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index f469cde..e4ca86d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.StreamTest
 import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution}
-import org.apache.spark.sql.test.SharedSQLContext
 
-class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
+
+class ContinuousQuerySuite extends StreamTest {
 
   import AwaitTerminationTester._
   import testImplicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 3d8dcaf..1c73208 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
-class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
+class FileStreamSinkSuite extends StreamTest {
   import testImplicits._
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 1d784f1..f681b88 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -137,7 +137,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
   val valueSchema = new StructType().add("value", StringType)
 }
 
-class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
+class FileStreamSourceSuite extends FileStreamSourceTest {
 
   import testImplicits._
 
@@ -594,7 +594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 }
 
-class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {
+class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
 
   import testImplicits._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 4efb7cf..1c0fb34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -23,9 +23,7 @@ import java.util.UUID
 import scala.util.Random
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest}
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
 /**
@@ -38,7 +36,7 @@ import org.apache.spark.util.Utils
  *
  * At the end, the resulting files are loaded and the answer is checked.
  */
-class FileStressSuite extends StreamTest with SharedSQLContext {
+class FileStressSuite extends StreamTest {
   import testImplicits._
 
   testQuietly("fault tolerance stress test - unpartitioned output") {

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index e5bd0b4..df76499 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
-class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+class MemorySinkSuite extends StreamTest with BeforeAndAfter {
 
   import testImplicits._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
index 81760d2..7f2972e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.sql.streaming
 
-import org.apache.spark.sql.StreamTest
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
 
-class MemorySourceStressSuite extends StreamTest with SharedSQLContext {
+class MemorySourceStressSuite extends StreamTest {
   import testImplicits._
 
   test("memory stress test") {

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index c17cb1d..9414b1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.ManualClock
 
-class StreamSuite extends StreamTest with SharedSQLContext {
+class StreamSuite extends StreamTest {
 
   import testImplicits._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
new file mode 100644
index 0000000..dd8672a
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.language.experimental.macros
+import scala.reflect.ClassTag
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.scalatest.Assertions
+import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.time.Span
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.{Dataset, Encoder, QueryTest, Row}
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
+
+/**
+ * A framework for implementing tests for streaming queries and sources.
+ *
+ * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
+ * blocking as necessary to let the stream catch up.  For example, the following adds some data to
+ * a stream, blocking until it can verify that the correct values are eventually produced.
+ *
+ * {{{
+ *  val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map(_ + 1)
+
+    testStream(mapped)(
+      AddData(inputData, 1, 2, 3),
+      CheckAnswer(2, 3, 4))
+ * }}}
+ *
+ * Note that while we do sleep to allow the other thread to progress without spinning,
+ * `StreamAction` checks should not depend on the amount of time spent sleeping.  Instead they
+ * should check the actual progress of the stream before verifying the required test condition.
+ *
+ * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
+ * avoid hanging forever in the case of failures. However, individual suites can change this
+ * by overriding `streamingTimeout`.
+ */
+trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
+
+  /** How long to wait for an active stream to catch up when checking a result. */
+  val streamingTimeout = 10.seconds
+
+  /** A trait for actions that can be performed while testing a streaming DataFrame. */
+  trait StreamAction
+
+  /** A trait to mark actions that require the stream to be actively running. */
+  trait StreamMustBeRunning
+
+  /**
+   * Adds the given data to the stream. Subsequent check answers will block until this data has
+   * been processed.
+   */
+  object AddData {
+    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
+      AddDataMemory(source, data)
+  }
+
+  /** A trait that can be extended when testing a source. */
+  trait AddData extends StreamAction {
+    /**
+     * Called to adding the data to a source. It should find the source to add data to from
+     * the active query, and then return the source object the data was added, as well as the
+     * offset of added data.
+     */
+    def addData(query: Option[StreamExecution]): (Source, Offset)
+  }
+
+  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
+    override def toString: String = s"AddData to $source: ${data.mkString(",")}"
+
+    override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+      (source, source.addData(data))
+    }
+  }
+
+  /**
+   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
+   * This operation automatically blocks until all added data has been processed.
+   */
+  object CheckAnswer {
+    def apply[A : Encoder](data: A*): CheckAnswerRows = {
+      val encoder = encoderFor[A]
+      val toExternalRow = RowEncoder(encoder.schema)
+      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false)
+    }
+
+    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false)
+  }
+
+  /**
+   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
+   * This operation automatically blocks until all added data has been processed.
+   */
+  object CheckLastBatch {
+    def apply[A : Encoder](data: A*): CheckAnswerRows = {
+      val encoder = encoderFor[A]
+      val toExternalRow = RowEncoder(encoder.schema)
+      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true)
+    }
+
+    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true)
+  }
+
+  case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean)
+      extends StreamAction with StreamMustBeRunning {
+    override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
+    private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
+  }
+
+  /** Stops the stream. It must currently be running. */
+  case object StopStream extends StreamAction with StreamMustBeRunning
+
+  /** Starts the stream, resuming if data has already been processed. It must not be running. */
+  case class StartStream(
+      trigger: Trigger = ProcessingTime(0),
+      triggerClock: Clock = new SystemClock)
+    extends StreamAction
+
+  /** Advance the trigger clock's time manually. */
+  case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
+
+  /** Signals that a failure is expected and should not kill the test. */
+  case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
+    val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+    override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]"
+  }
+
+  /** Assert that a body is true */
+  class Assert(condition: => Boolean, val message: String = "") extends StreamAction {
+    def run(): Unit = { Assertions.assert(condition) }
+    override def toString: String = s"Assert(<condition>, $message)"
+  }
+
+  object Assert {
+    def apply(condition: => Boolean, message: String = ""): Assert = new Assert(condition, message)
+    def apply(message: String)(body: => Unit): Assert = new Assert( { body; true }, message)
+    def apply(body: => Unit): Assert = new Assert( { body; true }, "")
+  }
+
+  /** Assert that a condition on the active query is true */
+  class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String)
+    extends StreamAction {
+    override def toString: String = s"AssertOnQuery(<condition>, $message)"
+  }
+
+  object AssertOnQuery {
+    def apply(condition: StreamExecution => Boolean, message: String = ""): AssertOnQuery = {
+      new AssertOnQuery(condition, message)
+    }
+
+    def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = {
+      new AssertOnQuery(condition, message)
+    }
+  }
+
+  /**
+   * Executes the specified actions on the given streaming DataFrame and provides helpful
+   * error messages in the case of failures or incorrect answers.
+   *
+   * Note that if the stream is not explicitly started before an action that requires it to be
+   * running then it will be automatically started before performing any other actions.
+   */
+  def testStream(
+      _stream: Dataset[_],
+      outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = {
+
+    val stream = _stream.toDF()
+    var pos = 0
+    var currentPlan: LogicalPlan = stream.logicalPlan
+    var currentStream: StreamExecution = null
+    var lastStream: StreamExecution = null
+    val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
+    val sink = new MemorySink(stream.schema, outputMode)
+
+    @volatile
+    var streamDeathCause: Throwable = null
+
+    // If the test doesn't manually start the stream, we do it automatically at the beginning.
+    val startedManually =
+      actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream])
+    val startedTest = if (startedManually) actions else StartStream() +: actions
+
+    def testActions = actions.zipWithIndex.map {
+      case (a, i) =>
+        if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) {
+          "=> " + a.toString
+        } else {
+          "   " + a.toString
+        }
+    }.mkString("\n")
+
+    def currentOffsets =
+      if (currentStream != null) currentStream.committedOffsets.toString else "not started"
+
+    def threadState =
+      if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
+
+    def testState =
+      s"""
+         |== Progress ==
+         |$testActions
+         |
+         |== Stream ==
+         |Output Mode: $outputMode
+         |Stream state: $currentOffsets
+         |Thread state: $threadState
+         |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
+         |
+         |== Sink ==
+         |${sink.toDebugString}
+         |
+         |
+         |== Plan ==
+         |${if (currentStream != null) currentStream.lastExecution else ""}
+         """.stripMargin
+
+    def verify(condition: => Boolean, message: String): Unit = {
+      if (!condition) {
+        failTest(message)
+      }
+    }
+
+    def eventually[T](message: String)(func: => T): T = {
+      try {
+        Eventually.eventually(Timeout(streamingTimeout)) {
+          func
+        }
+      } catch {
+        case NonFatal(e) =>
+          failTest(message, e)
+      }
+    }
+
+    def failTest(message: String, cause: Throwable = null) = {
+
+      // Recursively pretty print a exception with truncated stacktrace and internal cause
+      def exceptionToString(e: Throwable, prefix: String = ""): String = {
+        val base = s"$prefix${e.getMessage}" +
+          e.getStackTrace.take(10).mkString(s"\n$prefix", s"\n$prefix\t", "\n")
+        if (e.getCause != null) {
+          base + s"\n$prefix\tCaused by: " + exceptionToString(e.getCause, s"$prefix\t")
+        } else {
+          base
+        }
+      }
+      val c = Option(cause).map(exceptionToString(_))
+      val m = if (message != null && message.size > 0) Some(message) else None
+      fail(
+        s"""
+           |${(m ++ c).mkString(": ")}
+           |$testState
+         """.stripMargin)
+    }
+
+    val testThread = Thread.currentThread()
+    val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+    try {
+      startedTest.foreach { action =>
+        action match {
+          case StartStream(trigger, triggerClock) =>
+            verify(currentStream == null, "stream already running")
+            lastStream = currentStream
+            currentStream =
+              spark
+                .streams
+                .startQuery(
+                  StreamExecution.nextName,
+                  metadataRoot,
+                  stream,
+                  sink,
+                  outputMode,
+                  trigger,
+                  triggerClock)
+                .asInstanceOf[StreamExecution]
+            currentStream.microBatchThread.setUncaughtExceptionHandler(
+              new UncaughtExceptionHandler {
+                override def uncaughtException(t: Thread, e: Throwable): Unit = {
+                  streamDeathCause = e
+                  testThread.interrupt()
+                }
+              })
+
+          case AdvanceManualClock(timeToAdd) =>
+            verify(currentStream != null,
+                   "can not advance manual clock when a stream is not running")
+            verify(currentStream.triggerClock.isInstanceOf[ManualClock],
+                   s"can not advance clock of type ${currentStream.triggerClock.getClass}")
+            currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
+
+          case StopStream =>
+            verify(currentStream != null, "can not stop a stream that is not running")
+            try failAfter(streamingTimeout) {
+              currentStream.stop()
+              verify(!currentStream.microBatchThread.isAlive,
+                s"microbatch thread not stopped")
+              verify(!currentStream.isActive,
+                "query.isActive() is false even after stopping")
+              verify(currentStream.exception.isEmpty,
+                s"query.exception() is not empty after clean stop: " +
+                  currentStream.exception.map(_.toString()).getOrElse(""))
+            } catch {
+              case _: InterruptedException =>
+              case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+                failTest("Timed out while stopping and waiting for microbatchthread to terminate.")
+              case t: Throwable =>
+                failTest("Error while stopping stream", t)
+            } finally {
+              lastStream = currentStream
+              currentStream = null
+            }
+
+          case ef: ExpectFailure[_] =>
+            verify(currentStream != null, "can not expect failure when stream is not running")
+            try failAfter(streamingTimeout) {
+              val thrownException = intercept[ContinuousQueryException] {
+                currentStream.awaitTermination()
+              }
+              eventually("microbatch thread not stopped after termination with failure") {
+                assert(!currentStream.microBatchThread.isAlive)
+              }
+              verify(thrownException.query.eq(currentStream),
+                s"incorrect query reference in exception")
+              verify(currentStream.exception === Some(thrownException),
+                s"incorrect exception returned by query.exception()")
+
+              val exception = currentStream.exception.get
+              verify(exception.cause.getClass === ef.causeClass,
+                "incorrect cause in exception returned by query.exception()\n" +
+                  s"\tExpected: ${ef.causeClass}\n\tReturned: ${exception.cause.getClass}")
+            } catch {
+              case _: InterruptedException =>
+              case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+                failTest("Timed out while waiting for failure")
+              case t: Throwable =>
+                failTest("Error while checking stream failure", t)
+            } finally {
+              lastStream = currentStream
+              currentStream = null
+              streamDeathCause = null
+            }
+
+          case a: AssertOnQuery =>
+            verify(currentStream != null || lastStream != null,
+              "cannot assert when not stream has been started")
+            val streamToAssert = Option(currentStream).getOrElse(lastStream)
+            verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}")
+
+          case a: Assert =>
+            val streamToAssert = Option(currentStream).getOrElse(lastStream)
+            verify({ a.run(); true }, s"Assert failed: ${a.message}")
+
+          case a: AddData =>
+            try {
+              // Add data and get the source where it was added, and the expected offset of the
+              // added data.
+              val queryToUse = Option(currentStream).orElse(Option(lastStream))
+              val (source, offset) = a.addData(queryToUse)
+
+              def findSourceIndex(plan: LogicalPlan): Option[Int] = {
+                plan
+                  .collect { case StreamingExecutionRelation(s, _) => s }
+                  .zipWithIndex
+                  .find(_._1 == source)
+                  .map(_._2)
+              }
+
+              // Try to find the index of the source to which data was added. Either get the index
+              // from the current active query or the original input logical plan.
+              val sourceIndex =
+                queryToUse.flatMap { query =>
+                  findSourceIndex(query.logicalPlan)
+                }.orElse {
+                  findSourceIndex(stream.logicalPlan)
+                }.getOrElse {
+                  throw new IllegalArgumentException(
+                    "Could find index of the source to which data was added")
+                }
+
+              // Store the expected offset of added data to wait for it later
+              awaiting.put(sourceIndex, offset)
+            } catch {
+              case NonFatal(e) =>
+                failTest("Error adding data", e)
+            }
+
+          case CheckAnswerRows(expectedAnswer, lastOnly) =>
+            verify(currentStream != null, "stream not running")
+            // Get the map of source index to the current source objects
+            val indexToSource = currentStream
+              .logicalPlan
+              .collect { case StreamingExecutionRelation(s, _) => s }
+              .zipWithIndex
+              .map(_.swap)
+              .toMap
+
+            // Block until all data added has been processed for all the source
+            awaiting.foreach { case (sourceIndex, offset) =>
+              failAfter(streamingTimeout) {
+                currentStream.awaitOffset(indexToSource(sourceIndex), offset)
+              }
+            }
+
+            val sparkAnswer = try if (lastOnly) sink.latestBatchData else sink.allData catch {
+              case e: Exception =>
+                failTest("Exception while getting data from sink", e)
+            }
+
+            QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
+              error => failTest(error)
+            }
+        }
+        pos += 1
+      }
+    } catch {
+      case _: InterruptedException if streamDeathCause != null =>
+        failTest("Stream Thread Died")
+      case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+        failTest("Timed out waiting for stream")
+    } finally {
+      if (currentStream != null && currentStream.microBatchThread.isAlive) {
+        currentStream.stop()
+      }
+    }
+  }
+
+  /**
+   * Creates a stress test that randomly starts/stops/adds data/checks the result.
+   *
+   * @param ds a dataframe that executes + 1 on a stream of integers, returning the result.
+   * @param addData and add data action that adds the given numbers to the stream, encoding them
+   *                as needed
+   */
+  def runStressTest(
+      ds: Dataset[Int],
+      addData: Seq[Int] => StreamAction,
+      iterations: Int = 100): Unit = {
+    implicit val intEncoder = ExpressionEncoder[Int]()
+    var dataPos = 0
+    var running = true
+    val actions = new ArrayBuffer[StreamAction]()
+
+    def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) }
+
+    def addRandomData() = {
+      val numItems = Random.nextInt(10)
+      val data = dataPos until (dataPos + numItems)
+      dataPos += numItems
+      actions += addData(data)
+    }
+
+    (1 to iterations).foreach { i =>
+      val rand = Random.nextDouble()
+      if(!running) {
+        rand match {
+          case r if r < 0.7 => // AddData
+            addRandomData()
+
+          case _ => // StartStream
+            actions += StartStream()
+            running = true
+        }
+      } else {
+        rand match {
+          case r if r < 0.1 =>
+            addCheck()
+
+          case r if r < 0.7 => // AddData
+            addRandomData()
+
+          case _ => // StopStream
+            addCheck()
+            actions += StopStream
+            running = false
+        }
+      }
+    }
+    if(!running) { actions += StartStream() }
+    addCheck()
+    testStream(ds)(actions: _*)
+  }
+
+
+  object AwaitTerminationTester {
+
+    trait ExpectedBehavior
+
+    /** Expect awaitTermination to not be blocked */
+    case object ExpectNotBlocked extends ExpectedBehavior
+
+    /** Expect awaitTermination to get blocked */
+    case object ExpectBlocked extends ExpectedBehavior
+
+    /** Expect awaitTermination to throw an exception */
+    case class ExpectException[E <: Exception]()(implicit val t: ClassTag[E])
+      extends ExpectedBehavior
+
+    private val DEFAULT_TEST_TIMEOUT = 1.second
+
+    def test(
+        expectedBehavior: ExpectedBehavior,
+        awaitTermFunc: () => Unit,
+        testTimeout: Span = DEFAULT_TEST_TIMEOUT
+      ): Unit = {
+
+      expectedBehavior match {
+        case ExpectNotBlocked =>
+          withClue("Got blocked when expected non-blocking.") {
+            failAfter(testTimeout) {
+              awaitTermFunc()
+            }
+          }
+
+        case ExpectBlocked =>
+          withClue("Was not blocked when expected.") {
+            intercept[TestFailedDueToTimeoutException] {
+              failAfter(testTimeout) {
+                awaitTermFunc()
+              }
+            }
+          }
+
+        case e: ExpectException[_] =>
+          val thrownException =
+            withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") {
+              intercept[ContinuousQueryException] {
+                failAfter(testTimeout) {
+                  awaitTermFunc()
+                }
+              }
+            }
+          assert(thrownException.cause.getClass === e.t.runtimeClass,
+            "exception of incorrect type was throw")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 322bbb9..1f174ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -20,19 +20,18 @@ package org.apache.spark.sql.streaming
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, StreamTest}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.InternalOutputModes._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.test.SharedSQLContext
 
 object FailureSinglton {
   var firstTime = true
 }
 
-class StreamingAggregationSuite extends StreamTest with SharedSQLContext with BeforeAndAfterAll {
+class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
 
   override def afterAll(): Unit = {
     super.afterAll()

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 38a0534..a2aac69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest}
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
@@ -101,7 +101,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
   }
 }
 
-class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
 
   private def newMetadataDir =
     Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath

http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
deleted file mode 100644
index 8788898..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester._
-import org.scalatest.concurrent.AsyncAssertions.Waiter
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.util.ContinuousQueryListener.{QueryProgress, QueryStarted, QueryTerminated}
-
-class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
-
-  import testImplicits._
-
-  after {
-    spark.streams.active.foreach(_.stop())
-    assert(spark.streams.active.isEmpty)
-    assert(addedListeners.isEmpty)
-    // Make sure we don't leak any events to the next test
-    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-  }
-
-  test("single listener") {
-    val listener = new QueryStatusCollector
-    val input = MemoryStream[Int]
-    withListenerAdded(listener) {
-      testStream(input.toDS)(
-        StartStream(),
-        Assert("Incorrect query status in onQueryStarted") {
-          val status = listener.startStatus
-          assert(status != null)
-          assert(status.active == true)
-          assert(status.sourceStatuses.size === 1)
-          assert(status.sourceStatuses(0).description.contains("Memory"))
-
-          // The source and sink offsets must be None as this must be called before the
-          // batches have started
-          assert(status.sourceStatuses(0).offset === None)
-          assert(status.sinkStatus.offset === CompositeOffset(None :: Nil))
-
-          // No progress events or termination events
-          assert(listener.progressStatuses.isEmpty)
-          assert(listener.terminationStatus === null)
-        },
-        AddDataMemory(input, Seq(1, 2, 3)),
-        CheckAnswer(1, 2, 3),
-        Assert("Incorrect query status in onQueryProgress") {
-          eventually(Timeout(streamingTimeout)) {
-
-            // There should be only on progress event as batch has been processed
-            assert(listener.progressStatuses.size === 1)
-            val status = listener.progressStatuses.peek()
-            assert(status != null)
-            assert(status.active == true)
-            assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
-            assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
-
-            // No termination events
-            assert(listener.terminationStatus === null)
-          }
-        },
-        StopStream,
-        Assert("Incorrect query status in onQueryTerminated") {
-          eventually(Timeout(streamingTimeout)) {
-            val status = listener.terminationStatus
-            assert(status != null)
-
-            assert(status.active === false) // must be inactive by the time onQueryTerm is called
-            assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
-            assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
-          }
-          listener.checkAsyncErrors()
-        }
-      )
-    }
-  }
-
-  test("adding and removing listener") {
-    def isListenerActive(listener: QueryStatusCollector): Boolean = {
-      listener.reset()
-      testStream(MemoryStream[Int].toDS)(
-        StartStream(),
-        StopStream
-      )
-      listener.startStatus != null
-    }
-
-    try {
-      val listener1 = new QueryStatusCollector
-      val listener2 = new QueryStatusCollector
-
-      spark.streams.addListener(listener1)
-      assert(isListenerActive(listener1) === true)
-      assert(isListenerActive(listener2) === false)
-      spark.streams.addListener(listener2)
-      assert(isListenerActive(listener1) === true)
-      assert(isListenerActive(listener2) === true)
-      spark.streams.removeListener(listener1)
-      assert(isListenerActive(listener1) === false)
-      assert(isListenerActive(listener2) === true)
-    } finally {
-      addedListeners.foreach(spark.streams.removeListener)
-    }
-  }
-
-  test("event ordering") {
-    val listener = new QueryStatusCollector
-    withListenerAdded(listener) {
-      for (i <- 1 to 100) {
-        listener.reset()
-        require(listener.startStatus === null)
-        testStream(MemoryStream[Int].toDS)(
-          StartStream(),
-          Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
-          StopStream,
-          Assert { listener.checkAsyncErrors() }
-        )
-      }
-    }
-  }
-
-
-  private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = {
-    try {
-      failAfter(1 minute) {
-        spark.streams.addListener(listener)
-        body
-      }
-    } finally {
-      spark.streams.removeListener(listener)
-    }
-  }
-
-  private def addedListeners(): Array[ContinuousQueryListener] = {
-    val listenerBusMethod =
-      PrivateMethod[ContinuousQueryListenerBus]('listenerBus)
-    val listenerBus = spark.streams invokePrivate listenerBusMethod()
-    listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener])
-  }
-
-  class QueryStatusCollector extends ContinuousQueryListener {
-    // to catch errors in the async listener events
-    @volatile private var asyncTestWaiter = new Waiter
-
-    @volatile var startStatus: QueryStatus = null
-    @volatile var terminationStatus: QueryStatus = null
-    val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
-
-    def reset(): Unit = {
-      startStatus = null
-      terminationStatus = null
-      progressStatuses.clear()
-      asyncTestWaiter = new Waiter
-    }
-
-    def checkAsyncErrors(): Unit = {
-      asyncTestWaiter.await(timeout(streamingTimeout))
-    }
-
-
-    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
-      asyncTestWaiter {
-        startStatus = QueryStatus(queryStarted.query)
-      }
-    }
-
-    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryProgress called before onQueryStarted")
-        progressStatuses.add(QueryStatus(queryProgress.query))
-      }
-    }
-
-    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
-        terminationStatus = QueryStatus(queryTerminated.query)
-      }
-      asyncTestWaiter.dismiss()
-    }
-  }
-
-  case class QueryStatus(
-    active: Boolean,
-    exception: Option[Exception],
-    sourceStatuses: Array[SourceStatus],
-    sinkStatus: SinkStatus)
-
-  object QueryStatus {
-    def apply(query: ContinuousQuery): QueryStatus = {
-      QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus)
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message