spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-14884][SQL][STREAMING][WEBUI] Fix call site for continuous queries
Date Tue, 03 May 2016 17:10:38 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 17996e7d0 -> 45bc65519


[SPARK-14884][SQL][STREAMING][WEBUI] Fix call site for continuous queries

## What changes were proposed in this pull request?

Since we've been processing continuous queries in separate threads, the call sites are then
`run at <unknown>:0`. It's not wrong but provides very little information; in addition,
we can not distinguish two queries only from their call sites.

This patch fixes this.

### Before
[Jobs Tab]
![s1a](https://cloud.githubusercontent.com/assets/15843379/14766101/a47246b2-0a30-11e6-8d81-06a9a600113b.png)
[SQL Tab]
![s1b](https://cloud.githubusercontent.com/assets/15843379/14766102/a4750226-0a30-11e6-9ada-773d977d902b.png)
### After
[Jobs Tab]
![s2a](https://cloud.githubusercontent.com/assets/15843379/14766104/a89705b6-0a30-11e6-9830-0d40ec68527b.png)
[SQL Tab]
![s2b](https://cloud.githubusercontent.com/assets/15843379/14766103/a8966728-0a30-11e6-8e4d-c2e326400478.png)

## How was this patch tested?

Manually checks - see screenshots above.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #12650 from lw-lin/fix-call-site.

(cherry picked from commit 5bd9a2f697dac44a4777e24321a2eb4a3d54e24b)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45bc6551
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45bc6551
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45bc6551

Branch: refs/heads/branch-2.0
Commit: 45bc65519d702147d9fb4169efd235199407fa14
Parents: 17996e7
Author: Liwei Lin <lwlin7@gmail.com>
Authored: Tue May 3 10:10:25 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Tue May 3 10:10:35 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/SQLExecution.scala   |  7 +++++--
 .../spark/sql/execution/streaming/StreamExecution.scala | 12 ++++++++++--
 2 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45bc6551/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 397d66b..31c9f1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -23,7 +23,6 @@ import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
   SparkListenerSQLExecutionStart}
-import org.apache.spark.util.Utils
 
 private[sql] object SQLExecution {
 
@@ -46,7 +45,11 @@ private[sql] object SQLExecution {
       val executionId = SQLExecution.nextExecutionId
       sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
       val r = try {
-        val callSite = Utils.getCallSite()
+        // sparkContext.getCallSite() would first try to pick up any call site that was previously
+        // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly
on
+        // continuous queries would give us call site like "run at <unknown>:0"
+        val callSite = sparkSession.sparkContext.getCallSite()
+
         sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
           executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))

http://git-wip-us.apache.org/repos/asf/spark/blob/45bc6551/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3108346..3c5ced2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.util.ContinuousQueryListener
 import org.apache.spark.sql.util.ContinuousQueryListener._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{UninterruptibleThread, Utils}
 
 /**
  * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
@@ -101,10 +101,18 @@ class StreamExecution(
   @volatile
   private[sql] var streamDeathCause: ContinuousQueryException = null
 
+  /* Get the call site in the caller thread; will pass this into the micro batch thread */
+  private val callSite = Utils.getCallSite()
+
   /** The thread that runs the micro-batches of this stream. */
   private[sql] val microBatchThread =
     new UninterruptibleThread(s"stream execution thread for $name") {
-      override def run(): Unit = { runBatches() }
+      override def run(): Unit = {
+        // To fix call site like "run at <unknown>:0", we bridge the call site from
the caller
+        // thread to this micro batch thread
+        sparkSession.sparkContext.setCallSite(callSite)
+        runBatches()
+      }
     }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message