Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AF934200B27 for ; Wed, 8 Jun 2016 01:40:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ADA5D160A4F; Tue, 7 Jun 2016 23:40:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5A449160A36 for ; Wed, 8 Jun 2016 01:40:08 +0200 (CEST) Received: (qmail 53599 invoked by uid 500); 7 Jun 2016 23:40:07 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 53590 invoked by uid 99); 7 Jun 2016 23:40:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Jun 2016 23:40:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 69FE0DFC71; Tue, 7 Jun 2016 23:40:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tdas@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15580][SQL] Add ContinuousQueryInfo to make ContinuousQueryListener events serializable Date: Tue, 7 Jun 2016 23:40:07 +0000 (UTC) archived-at: Tue, 07 Jun 2016 23:40:09 -0000 Repository: spark Updated Branches: refs/heads/master 695dbc816 -> 0cfd6192f [SPARK-15580][SQL] Add ContinuousQueryInfo to make ContinuousQueryListener events serializable ## What changes were proposed in this pull request? This PR adds ContinuousQueryInfo to make ContinuousQueryListener events serializable in order to support writing events into the event log. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu Closes #13335 from zsxwing/query-info. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cfd6192 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cfd6192 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cfd6192 Branch: refs/heads/master Commit: 0cfd6192f38932a26195a6a8dbbc637d67f5ec55 Parents: 695dbc8 Author: Shixiong Zhu Authored: Tue Jun 7 16:40:03 2016 -0700 Committer: Tathagata Das Committed: Tue Jun 7 16:40:03 2016 -0700 ---------------------------------------------------------------------- .../streaming/ContinuousQueryListenerBus.scala | 27 +--- .../execution/streaming/StreamExecution.scala | 21 ++- .../sql/streaming/ContinuousQueryInfo.scala | 34 +++++ .../sql/streaming/ContinuousQueryListener.scala | 34 +++-- .../apache/spark/sql/streaming/SinkStatus.scala | 6 +- .../spark/sql/streaming/SourceStatus.scala | 8 +- .../ContinuousQueryListenerSuite.scala | 133 +++++++++++++++---- .../sql/streaming/ContinuousQuerySuite.scala | 16 +-- 8 files changed, 203 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala index 2a1be09..f50951f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala @@ -22,15 +22,13 @@ import org.apache.spark.sql.streaming.ContinuousQueryListener import org.apache.spark.util.ListenerBus /** - * A bus to forward events to [[ContinuousQueryListener]]s. This one will wrap received - * [[ContinuousQueryListener.Event]]s as WrappedContinuousQueryListenerEvents and send them to the - * Spark listener bus. It also registers itself with Spark listener bus, so that it can receive - * WrappedContinuousQueryListenerEvents, unwrap them as ContinuousQueryListener.Events and - * dispatch them to ContinuousQueryListener. + * A bus to forward events to [[ContinuousQueryListener]]s. This one will send received + * [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also registers itself with + * Spark listener bus, so that it can receive [[ContinuousQueryListener.Event]]s and dispatch them + * to ContinuousQueryListener. */ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) - extends SparkListener - with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { + extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { import ContinuousQueryListener._ @@ -45,13 +43,13 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) case s: QueryStarted => postToAll(s) case _ => - sparkListenerBus.post(new WrappedContinuousQueryListenerEvent(event)) + sparkListenerBus.post(event) } } override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case WrappedContinuousQueryListenerEvent(e) => + case e: ContinuousQueryListener.Event => postToAll(e) case _ => } @@ -71,15 +69,4 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) } } - /** - * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark - * listener bus. - */ - private case class WrappedContinuousQueryListenerEvent( - streamingListenerEvent: ContinuousQueryListener.Event) - extends SparkListenerEvent { - - // Do not log streaming events in event log as history server does not support these events. - protected[spark] override def logEvent: Boolean = false - } } http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 16d38a2..d9800e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -131,12 +131,13 @@ class StreamExecution( /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { val localAvailableOffsets = availableOffsets - sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray + sources.map(s => + new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray } /** Returns current status of the sink. */ override def sinkStatus: SinkStatus = - new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources)) + new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString) /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */ override def exception: Option[ContinuousQueryException] = Option(streamDeathCause) @@ -167,7 +168,7 @@ class StreamExecution( // Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners, // so must mark this as ACTIVE first. state = ACTIVE - postEvent(new QueryStarted(this)) // Assumption: Does not throw exception. + postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception. // Unblock starting thread startLatch.countDown() @@ -206,7 +207,10 @@ class StreamExecution( } finally { state = TERMINATED sparkSession.streams.notifyQueryTermination(StreamExecution.this) - postEvent(new QueryTerminated(this)) + postEvent(new QueryTerminated( + this.toInfo, + exception.map(_.getMessage), + exception.map(_.getStackTrace.toSeq).getOrElse(Nil))) terminationLatch.countDown() } } @@ -374,7 +378,7 @@ class StreamExecution( logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") // Update committed offsets. committedOffsets ++= availableOffsets - postEvent(new QueryProgress(this)) + postEvent(new QueryProgress(this.toInfo)) } private def postEvent(event: ContinuousQueryListener.Event) { @@ -484,6 +488,13 @@ class StreamExecution( """.stripMargin } + private def toInfo: ContinuousQueryInfo = { + new ContinuousQueryInfo( + this.name, + this.sourceStatuses, + this.sinkStatus) + } + trait State case object INITIALIZED extends State case object ACTIVE extends State http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala new file mode 100644 index 0000000..57b718b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * A class used to report information about the progress of a [[ContinuousQuery]]. + * + * @param name The [[ContinuousQuery]] name. + * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources. + * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink. + */ +@Experimental +class ContinuousQueryInfo private[sql]( + val name: String, + val sourceStatuses: Seq[SourceStatus], + val sinkStatus: SinkStatus) http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala index 6bdd513..dd31114 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental +import org.apache.spark.scheduler.SparkListenerEvent /** * :: Experimental :: @@ -70,26 +71,43 @@ abstract class ContinuousQueryListener { object ContinuousQueryListener { /** - * Base type of [[ContinuousQueryListener]] events. + * :: Experimental :: + * Base type of [[ContinuousQueryListener]] events * @since 2.0.0 */ - trait Event + @Experimental + trait Event extends SparkListenerEvent /** - * Event representing the start of a query. + * :: Experimental :: + * Event representing the start of a query * @since 2.0.0 */ - class QueryStarted private[sql](val query: ContinuousQuery) extends Event + @Experimental + class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event /** - * Event representing any progress updates in a query. + * :: Experimental :: + * Event representing any progress updates in a query * @since 2.0.0 */ - class QueryProgress private[sql](val query: ContinuousQuery) extends Event + @Experimental + class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event /** - * Event representing that termination of a query. + * :: Experimental :: + * Event representing that termination of a query + * + * @param queryInfo Information about the status of the query. + * @param exception The exception message of the [[ContinuousQuery]] if the query was terminated + * with an exception. Otherwise, it will be `None`. + * @param stackTrace The stack trace of the exception if the query was terminated with an + * exception. It will be empty if there was no error. * @since 2.0.0 */ - class QueryTerminated private[sql](val query: ContinuousQuery) extends Event + @Experimental + class QueryTerminated private[sql]( + val queryInfo: ContinuousQueryInfo, + val exception: Option[String], + val stackTrace: Seq[StackTraceElement]) extends Event } http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala index 79ddf01..de1efe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -18,17 +18,17 @@ package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, Sink} +import org.apache.spark.sql.execution.streaming.Sink /** * :: Experimental :: * Status and metrics of a streaming [[Sink]]. * * @param description Description of the source corresponding to this status - * @param offset Current offset up to which data has been written by the sink + * @param offsetDesc Description of the current offset up to which data has been written by the sink * @since 2.0.0 */ @Experimental class SinkStatus private[sql]( val description: String, - val offset: Offset) + val offsetDesc: String) http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala index 8fccd5b..bd0c848 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala @@ -18,17 +18,17 @@ package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, Source} +import org.apache.spark.sql.execution.streaming.Source /** * :: Experimental :: * Status and metrics of a streaming [[Source]]. * - * @param description Description of the source corresponding to this status - * @param offset Current offset of the source, if known + * @param description Description of the source corresponding to this status + * @param offsetDesc Description of the current [[Source]] offset if known * @since 2.0.0 */ @Experimental class SourceStatus private[sql] ( val description: String, - val offset: Option[Offset]) + val offsetDesc: Option[String]) http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala index cdd97da..9b59ab6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala @@ -26,7 +26,9 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ +import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.util.JsonProtocol class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { @@ -51,14 +53,13 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { Assert("Incorrect query status in onQueryStarted") { val status = listener.startStatus assert(status != null) - assert(status.active == true) assert(status.sourceStatuses.size === 1) assert(status.sourceStatuses(0).description.contains("Memory")) // The source and sink offsets must be None as this must be called before the // batches have started - assert(status.sourceStatuses(0).offset === None) - assert(status.sinkStatus.offset === CompositeOffset(None :: Nil)) + assert(status.sourceStatuses(0).offsetDesc === None) + assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString) // No progress events or termination events assert(listener.progressStatuses.isEmpty) @@ -73,9 +74,8 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(listener.progressStatuses.size === 1) val status = listener.progressStatuses.peek() assert(status != null) - assert(status.active == true) - assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) + assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) + assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) // No termination events assert(listener.terminationStatus === null) @@ -86,10 +86,8 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { eventually(Timeout(streamingTimeout)) { val status = listener.terminationStatus assert(status != null) - - assert(status.active === false) // must be inactive by the time onQueryTerm is called - assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) + assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) + assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) } listener.checkAsyncErrors() } @@ -141,6 +139,92 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { } } + test("exception should be reported in QueryTerminated") { + val listener = new QueryStatusCollector + withListenerAdded(listener) { + val input = MemoryStream[Int] + testStream(input.toDS.map(_ / 0))( + StartStream(), + AddData(input, 1), + ExpectFailure[SparkException](), + Assert { + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + assert(listener.terminationStatus !== null) + assert(listener.terminationException.isDefined) + assert(listener.terminationException.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationStackTrace.nonEmpty) + } + ) + } + } + + test("QueryStarted serialization") { + val queryStartedInfo = new ContinuousQueryInfo( + "name", + Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)), + new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString)) + val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo) + val json = JsonProtocol.sparkEventToJson(queryStarted) + val newQueryStarted = JsonProtocol.sparkEventFromJson(json) + .asInstanceOf[ContinuousQueryListener.QueryStarted] + assertContinuousQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo) + } + + test("QueryProgress serialization") { + val queryProcessInfo = new ContinuousQueryInfo( + "name", + Seq( + new SourceStatus("source1", Some(LongOffset(0).toString)), + new SourceStatus("source2", Some(LongOffset(1).toString))), + new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) + val queryProcess = new ContinuousQueryListener.QueryProgress(queryProcessInfo) + val json = JsonProtocol.sparkEventToJson(queryProcess) + val newQueryProcess = JsonProtocol.sparkEventFromJson(json) + .asInstanceOf[ContinuousQueryListener.QueryProgress] + assertContinuousQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo) + } + + test("QueryTerminated serialization") { + val queryTerminatedInfo = new ContinuousQueryInfo( + "name", + Seq( + new SourceStatus("source1", Some(LongOffset(0).toString)), + new SourceStatus("source2", Some(LongOffset(1).toString))), + new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) + val exception = new RuntimeException("exception") + val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated( + queryTerminatedInfo, + Some(exception.getMessage), + exception.getStackTrace) + val json = + JsonProtocol.sparkEventToJson(queryQueryTerminated) + val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) + .asInstanceOf[ContinuousQueryListener.QueryTerminated] + assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo) + assert(queryQueryTerminated.exception === newQueryTerminated.exception) + } + + private def assertContinuousQueryInfoEquals( + expected: ContinuousQueryInfo, + actual: ContinuousQueryInfo): Unit = { + assert(expected.name === actual.name) + assert(expected.sourceStatuses.size === actual.sourceStatuses.size) + expected.sourceStatuses.zip(actual.sourceStatuses).foreach { + case (expectedSource, actualSource) => + assertSourceStatus(expectedSource, actualSource) + } + assertSinkStatus(expected.sinkStatus, actual.sinkStatus) + } + + private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = { + assert(expected.description === actual.description) + assert(expected.offsetDesc === actual.offsetDesc) + } + + private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = { + assert(expected.description === actual.description) + assert(expected.offsetDesc === actual.offsetDesc) + } private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { try { @@ -164,9 +248,12 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { // to catch errors in the async listener events @volatile private var asyncTestWaiter = new Waiter - @volatile var startStatus: QueryStatus = null - @volatile var terminationStatus: QueryStatus = null - val progressStatuses = new ConcurrentLinkedQueue[QueryStatus] + @volatile var startStatus: ContinuousQueryInfo = null + @volatile var terminationStatus: ContinuousQueryInfo = null + @volatile var terminationException: Option[String] = null + @volatile var terminationStackTrace: Seq[StackTraceElement] = null + + val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo] def reset(): Unit = { startStatus = null @@ -182,35 +269,25 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { override def onQueryStarted(queryStarted: QueryStarted): Unit = { asyncTestWaiter { - startStatus = QueryStatus(queryStarted.query) + startStatus = queryStarted.queryInfo } } override def onQueryProgress(queryProgress: QueryProgress): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryProgress called before onQueryStarted") - progressStatuses.add(QueryStatus(queryProgress.query)) + progressStatuses.add(queryProgress.queryInfo) } } override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryTerminated called before onQueryStarted") - terminationStatus = QueryStatus(queryTerminated.query) + terminationStatus = queryTerminated.queryInfo + terminationException = queryTerminated.exception + terminationStackTrace = queryTerminated.stackTrace } asyncTestWaiter.dismiss() } } - - case class QueryStatus( - active: Boolean, - exception: Option[Exception], - sourceStatuses: Array[SourceStatus], - sinkStatus: SinkStatus) - - object QueryStatus { - def apply(query: ContinuousQuery): QueryStatus = { - QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus) - } - } } http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index e4ca86d..5542405 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -66,21 +66,21 @@ class ContinuousQuerySuite extends StreamTest { testStream(mapped)( AssertOnQuery(_.sourceStatuses.length === 1), AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), - AssertOnQuery(_.sourceStatuses(0).offset === None), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === None), AssertOnQuery(_.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)), + AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString), AddData(inputData, 1, 2), CheckAnswer(6, 3), - AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))), - AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)), + AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString), AddData(inputData, 1, 2), CheckAnswer(6, 3, 6, 3), - AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))), - AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(1).toString)), + AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString), AddData(inputData, 0), ExpectFailure[SparkException], - AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))), - AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))) + AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(2).toString)), + AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString) ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org