spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-17096][SQL][STREAMING] Improve exception string reported through the StreamingQueryListener
Date Wed, 17 Aug 2016 20:31:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master cc97ea188 -> d60af8f6a


[SPARK-17096][SQL][STREAMING] Improve exception string reported through the StreamingQueryListener

## What changes were proposed in this pull request?

Currently, the stackTrace (as `Array[StackTraceElements]`) reported through StreamingQueryListener.onQueryTerminated
is useless as it has the stack trace of where StreamingQueryException is defined, not the
stack trace of underlying exception.  For example, if a streaming query fails because of a
/ by zero exception in a task, the `QueryTerminated.stackTrace` will have
```
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:211)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)
```
This is basically useless, as it is location where the StreamingQueryException was defined.
What we want is

Here is the right way to reason about what should be posted as through StreamingQueryListener.onQueryTerminated
- The actual exception could either be a SparkException, or an arbitrary exception.
  - SparkException reports the relevant executor stack trace of a failed task as a string
in the the exception message. The `Array[StackTraceElements]` returned by `SparkException.stackTrace()`
is mostly irrelevant.
  - For any arbitrary exception, the `Array[StackTraceElements]` returned by `exception.stackTrace()`
may be relevant.
- When there is an error in a streaming query, it's hard to reason whether the `Array[StackTraceElements]`
is useful or not. In fact, it is not clear whether it is even useful to report the stack trace
as this array of Java objects. It may be sufficient to report the strack trace as a string,
along with the message. This is how Spark reported executor stra
- Hence, this PR simplifies the API by removing the array `stackTrace` from `QueryTerminated`.
Instead the `exception` returns a string containing the message and the stack trace of the
actual underlying exception that failed the streaming query (i.e. not that of the StreamingQueryException).
If anyone is interested in the actual stack trace as an array, can always access them through
`streamingQuery.exception` which returns the exception object.

With this change, if a streaming query fails because of a / by zero exception in a task, the
`QueryTerminated.exception` will be
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed
1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.ArithmeticException:
/ by zero
	at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcII$sp(StreamingQueryListenerSuite.scala:153)
	at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153)
	at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:226)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1429)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1417)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1416)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1416)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
...
```
It contains the relevant executor stack trace. In a case non-SparkException, if the streaming
source MemoryStream throws an exception, exception message will have the relevant stack trace.
```
java.lang.RuntimeException: this is the exception message
	at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:103)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:316)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:313)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:313)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)
```

Note that this change in the public `QueryTerminated` class is okay as the APIs are still
experimental.

## How was this patch tested?
Unit tests that test whether the right information is present in the exception message reported
through QueryTerminated object.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #14675 from tdas/SPARK-17096.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d60af8f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d60af8f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d60af8f6

Branch: refs/heads/master
Commit: d60af8f6aa53373de1333cc642cf2a9d7b39d912
Parents: cc97ea1
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Aug 17 13:31:34 2016 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Aug 17 13:31:34 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/StreamExecution.scala      |  5 +----
 .../spark/sql/streaming/StreamingQueryException.scala  |  3 ++-
 .../spark/sql/streaming/StreamingQueryListener.scala   |  3 +--
 .../sql/streaming/StreamingQueryListenerSuite.scala    | 13 ++++++-------
 4 files changed, 10 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d60af8f6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 66fb5a4..4d05af0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -217,10 +217,7 @@ class StreamExecution(
     } finally {
       state = TERMINATED
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
-      postEvent(new QueryTerminated(
-        this.toInfo,
-        exception.map(_.getMessage),
-        exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
+      postEvent(new QueryTerminated(this.toInfo, exception.map(_.cause).map(Utils.exceptionString)))
       terminationLatch.countDown()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d60af8f6/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 90f95ca..bd3e5a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -22,7 +22,8 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
 
 /**
  * :: Experimental ::
- * Exception that stopped a [[StreamingQuery]].
+ * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
+ * that caused the failure.
  * @param query      Query that caused the exception
  * @param message     Message of this exception
  * @param cause       Internal cause of this exception

http://git-wip-us.apache.org/repos/asf/spark/blob/d60af8f6/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 3b3cead..db606ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -108,6 +108,5 @@ object StreamingQueryListener {
   @Experimental
   class QueryTerminated private[sql](
       val queryInfo: StreamingQueryInfo,
-      val exception: Option[String],
-      val stackTrace: Seq[StackTraceElement]) extends Event
+      val exception: Option[String]) extends Event
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d60af8f6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 7f4d28c..77602e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -94,7 +94,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
             assert(status.id === query.id)
             assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
             assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
-            assert(listener.terminationStackTrace.isEmpty)
             assert(listener.terminationException === None)
           }
           listener.checkAsyncErrors()
@@ -147,7 +146,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
     }
   }
 
-  test("exception should be reported in QueryTerminated") {
+  testQuietly("exception should be reported in QueryTerminated") {
     val listener = new QueryStatusCollector
     withListenerAdded(listener) {
       val input = MemoryStream[Int]
@@ -159,8 +158,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
           spark.sparkContext.listenerBus.waitUntilEmpty(10000)
           assert(listener.terminationStatus !== null)
           assert(listener.terminationException.isDefined)
+          // Make sure that the exception message reported through listener
+          // contains the actual exception and relevant stack trace
+          assert(!listener.terminationException.get.contains("StreamingQueryException"))
           assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
-          assert(listener.terminationStackTrace.nonEmpty)
+          assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
         }
       )
     }
@@ -205,8 +207,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
     val exception = new RuntimeException("exception")
     val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
       queryTerminatedInfo,
-      Some(exception.getMessage),
-      exception.getStackTrace)
+      Some(exception.getMessage))
     val json =
       JsonProtocol.sparkEventToJson(queryQueryTerminated)
     val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
@@ -262,7 +263,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
     @volatile var startStatus: StreamingQueryInfo = null
     @volatile var terminationStatus: StreamingQueryInfo = null
     @volatile var terminationException: Option[String] = null
-    @volatile var terminationStackTrace: Seq[StackTraceElement] = null
 
     val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo]
 
@@ -296,7 +296,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
         assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
         terminationStatus = queryTerminated.queryInfo
         terminationException = queryTerminated.exception
-        terminationStackTrace = queryTerminated.stackTrace
       }
       asyncTestWaiter.dismiss()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message