spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-11206] Support SQL UI on the history server
Date Wed, 25 Nov 2015 23:13:20 GMT
Repository: spark
Updated Branches:
  refs/heads/master 21e560641 -> cc243a079


[SPARK-11206] Support SQL UI on the history server

On the live web UI, there is a SQL tab which provides valuable information for the SQL query.
But once the workload is finished, we won't see the SQL tab on the history server. It will
be helpful if we support SQL UI on the history server so we can analyze it even after its
execution.

To support SQL UI on the history server:
1. I added an `onOtherEvent` method to the `SparkListener` trait and post all SQL related
events to the same event bus.
2. Two SQL events `SparkListenerSQLExecutionStart` and `SparkListenerSQLExecutionEnd` are
defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4.  A new trait `SparkHistoryListenerFactory` is added to allow the history server to feed
events to the SQL history listener. The SQL implementation is loaded at runtime using `java.util.ServiceLoader`.

Author: Carson Wang <carson.wang@intel.com>

Closes #9297 from carsonwang/SqlHistoryUI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc243a07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc243a07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc243a07

Branch: refs/heads/master
Commit: cc243a079b1c039d6e7f0b410d1654d94a090e14
Parents: 21e5606
Author: Carson Wang <carson.wang@intel.com>
Authored: Wed Nov 25 15:13:13 2015 -0800
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Wed Nov 25 15:13:13 2015 -0800

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 .../org/apache/spark/JavaSparkListener.java     |   3 +
 .../org/apache/spark/SparkFirehoseListener.java |   4 +
 .../spark/scheduler/EventLoggingListener.scala  |   4 +
 .../apache/spark/scheduler/SparkListener.scala  |  24 +++-
 .../spark/scheduler/SparkListenerBus.scala      |   1 +
 .../scala/org/apache/spark/ui/SparkUI.scala     |  16 ++-
 .../org/apache/spark/util/JsonProtocol.scala    |  11 +-
 ....spark.scheduler.SparkHistoryListenerFactory |   1 +
 .../scala/org/apache/spark/sql/SQLContext.scala |  18 ++-
 .../spark/sql/execution/SQLExecution.scala      |  24 +---
 .../spark/sql/execution/SparkPlanInfo.scala     |  46 ++++++
 .../sql/execution/metric/SQLMetricInfo.scala    |  30 ++++
 .../spark/sql/execution/metric/SQLMetrics.scala |  56 +++++---
 .../spark/sql/execution/ui/ExecutionPage.scala  |   4 +-
 .../spark/sql/execution/ui/SQLListener.scala    | 139 +++++++++++++------
 .../apache/spark/sql/execution/ui/SQLTab.scala  |  12 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala |  20 +--
 .../sql/execution/metric/SQLMetricsSuite.scala  |   4 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  43 +++---
 .../spark/sql/test/SharedSQLContext.scala       |   1 +
 21 files changed, 327 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 08fba6d..7262c96 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -82,4 +82,5 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
+org.apache.spark.scheduler.SparkHistoryListenerFactory
 .*parquet

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/java/org/apache/spark/JavaSparkListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
index fa9acf0..23bc9a2 100644
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -82,4 +82,7 @@ public class JavaSparkListener implements SparkListener {
   @Override
   public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
 
+  @Override
+  public void onOtherEvent(SparkListenerEvent event) { }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 1214d05..e6b24af 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -118,4 +118,8 @@ public class SparkFirehoseListener implements SparkListener {
         onEvent(blockUpdated);
     }
 
+    @Override
+    public void onOtherEvent(SparkListenerEvent event) {
+        onEvent(event);
+    }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 000a021..eaa07ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
   // No-op because logging every update would be overkill
   override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit =
{ }
 
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    logEvent(event, flushLogger = true)
+  }
+
   /**
    * Stop logging events. The event log file will be renamed so that it loses the
    * ".inprogress" suffix.

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 896f174..075a7f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -22,15 +22,19 @@ import java.util.Properties
 import scala.collection.Map
 import scala.collection.mutable
 
-import org.apache.spark.{Logging, TaskEndReason}
+import com.fasterxml.jackson.annotation.JsonTypeInfo
+
+import org.apache.spark.{Logging, SparkConf, TaskEndReason}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.ui.SparkUI
 
 @DeveloperApi
-sealed trait SparkListenerEvent
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property =
"Event")
+trait SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
@@ -131,6 +135,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
 
 /**
+ * Interface for creating history listeners defined in other modules like SQL, which are
used to
+ * rebuild the history UI.
+ */
+private[spark] trait SparkHistoryListenerFactory {
+  /**
+   * Create listeners used to rebuild the history UI.
+   */
+  def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
+}
+
+/**
  * :: DeveloperApi ::
  * Interface for listening to events from the Spark scheduler. Note that this is an internal
  * interface which might change in different Spark releases. Java clients should extend
@@ -223,6 +238,11 @@ trait SparkListener {
    * Called when the driver receives a block update info.
    */
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
+
+  /**
+   * Called when other events like SQL-specific events are posted.
+   */
+  def onOtherEvent(event: SparkListenerEvent) { }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 04afde3..95722a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener,
SparkLi
       case blockUpdated: SparkListenerBlockUpdated =>
         listener.onBlockUpdated(blockUpdated)
       case logStart: SparkListenerLogStart => // ignore event log metadata
+      case _ => listener.onOtherEvent(event)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 4608bce..8da6884 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.ui
 
-import java.util.Date
+import java.util.{Date, ServiceLoader}
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
   UIRoot}
+import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.StorageStatusListener
@@ -154,7 +157,16 @@ private[spark] object SparkUI {
       appName: String,
       basePath: String,
       startTime: Long): SparkUI = {
-    create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+    val sparkUI = create(
+      None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+
+    val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
+      Utils.getContextOrSparkClassLoader).asScala
+    listenerFactories.foreach { listenerFactory =>
+      val listeners = listenerFactory.createListeners(conf, sparkUI)
+      listeners.foreach(listenerBus.addListener)
+    }
+    sparkUI
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c9beeb2..7f5d713 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,19 +19,21 @@ package org.apache.spark.util
 
 import java.util.{Properties, UUID}
 
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
 
 /**
@@ -54,6 +56,8 @@ private[spark] object JsonProtocol {
 
   private implicit val format = DefaultFormats
 
+  private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
   /** ------------------------------------------------- *
    * JSON serialization methods for SparkListenerEvents |
    * -------------------------------------------------- */
@@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
         executorMetricsUpdateToJson(metricsUpdate)
       case blockUpdated: SparkListenerBlockUpdated =>
         throw new MatchError(blockUpdated)  // TODO(ekl) implement this
+      case _ => parse(mapper.writeValueAsString(event))
     }
   }
 
@@ -511,6 +516,8 @@ private[spark] object JsonProtocol {
       case `executorRemoved` => executorRemovedFromJson(json)
       case `logStart` => logStartFromJson(json)
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+      case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
+        .asInstanceOf[SparkListenerEvent]
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
----------------------------------------------------------------------
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
new file mode 100644
index 0000000..507100b
--- /dev/null
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
@@ -0,0 +1 @@
+org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 46bf544..1c2ac5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1263,6 +1263,8 @@ object SQLContext {
    */
   @transient private val instantiatedContext = new AtomicReference[SQLContext]()
 
+  @transient private val sqlListener = new AtomicReference[SQLListener]()
+
   /**
    * Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
    *
@@ -1307,6 +1309,10 @@ object SQLContext {
     Option(instantiatedContext.get())
   }
 
+  private[sql] def clearSqlListener(): Unit = {
+    sqlListener.set(null)
+  }
+
   /**
    * Changes the SQLContext that will be returned in this thread and its children when
    * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1355,9 +1361,13 @@ object SQLContext {
    * Create a SQLListener then add it into SparkContext, and create an SQLTab if there is
SparkUI.
    */
   private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
-    val listener = new SQLListener(sc.conf)
-    sc.addSparkListener(listener)
-    sc.ui.foreach(new SQLTab(listener, _))
-    listener
+    if (sqlListener.get() == null) {
+      val listener = new SQLListener(sc.conf)
+      if (sqlListener.compareAndSet(null, listener)) {
+        sc.addSparkListener(listener)
+        sc.ui.foreach(new SQLTab(listener, _))
+      }
+    }
+    sqlListener.get()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 1422e15..3497198 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
+  SparkListenerSQLExecutionEnd}
 import org.apache.spark.util.Utils
 
 private[sql] object SQLExecution {
@@ -45,25 +46,14 @@ private[sql] object SQLExecution {
       sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
       val r = try {
         val callSite = Utils.getCallSite()
-        sqlContext.listener.onExecutionStart(
-          executionId,
-          callSite.shortForm,
-          callSite.longForm,
-          queryExecution.toString,
-          SparkPlanGraph(queryExecution.executedPlan),
-          System.currentTimeMillis())
+        sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
+          executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
+          SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
         try {
           body
         } finally {
-          // Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
-          // However, onJobStart and onJobEnd run in the listener thread. Because we cannot
add new
-          // SQL event types to SparkListener since it's a public API, we cannot guarantee
that.
-          //
-          // SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
-          //
-          // The worst case is onExecutionEnd may happen before onJobStart when the listener
thread
-          // is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
-          sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
+          sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
+            executionId, System.currentTimeMillis()))
         }
       } finally {
         sc.setLocalProperty(EXECUTION_ID_KEY, null)

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
new file mode 100644
index 0000000..486ce34
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.execution.metric.SQLMetricInfo
+import org.apache.spark.util.Utils
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about a SQL SparkPlan.
+ */
+@DeveloperApi
+class SparkPlanInfo(
+    val nodeName: String,
+    val simpleString: String,
+    val children: Seq[SparkPlanInfo],
+    val metrics: Seq[SQLMetricInfo])
+
+private[sql] object SparkPlanInfo {
+
+  def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
+    val metrics = plan.metrics.toSeq.map { case (key, metric) =>
+      new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
+        Utils.getFormattedClassName(metric.param))
+    }
+    val children = plan.children.map(fromSparkPlan)
+
+    new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
new file mode 100644
index 0000000..2708219
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about a SQL Metric.
+ */
+@DeveloperApi
+class SQLMetricInfo(
+    val name: String,
+    val accumulatorId: Long,
+    val metricParam: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1c253e3..6c0f6f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -104,21 +104,39 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String,
initialVa
   override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
 }
 
+private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
+
+private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
+  (values: Seq[Long]) => {
+    // This is a workaround for SPARK-11013.
+    // We use -1 as initial value of the accumulator, if the accumulator is valid, we will
update
+    // it at the end of task and the value will be at least 0.
+    val validValues = values.filter(_ >= 0)
+    val Seq(sum, min, med, max) = {
+      val metric = if (validValues.length == 0) {
+        Seq.fill(4)(0L)
+      } else {
+        val sorted = validValues.sorted
+        Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length
- 1))
+      }
+      metric.map(Utils.bytesToString)
+    }
+    s"\n$sum ($min, $med, $max)"
+  }, -1L)
+
 private[sql] object SQLMetrics {
 
   private def createLongMetric(
       sc: SparkContext,
       name: String,
-      stringValue: Seq[Long] => String,
-      initialValue: Long): LongSQLMetric = {
-    val param = new LongSQLMetricParam(stringValue, initialValue)
+      param: LongSQLMetricParam): LongSQLMetric = {
     val acc = new LongSQLMetric(name, param)
     sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
   }
 
   def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
-    createLongMetric(sc, name, _.sum.toString, 0L)
+    createLongMetric(sc, name, LongSQLMetricParam)
   }
 
   /**
@@ -126,31 +144,25 @@ private[sql] object SQLMetrics {
    * spill size, etc.
    */
   def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = {
-    val stringValue = (values: Seq[Long]) => {
-      // This is a workaround for SPARK-11013.
-      // We use -1 as initial value of the accumulator, if the accumulator is valid, we will
update
-      // it at the end of task and the value will be at least 0.
-      val validValues = values.filter(_ >= 0)
-      val Seq(sum, min, med, max) = {
-        val metric = if (validValues.length == 0) {
-          Seq.fill(4)(0L)
-        } else {
-          val sorted = validValues.sorted
-          Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length
- 1))
-        }
-        metric.map(Utils.bytesToString)
-      }
-      s"\n$sum ($min, $med, $max)"
-    }
     // The final result of this metric in physical operator UI may looks like:
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
-    createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L)
+    createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam)
+  }
+
+  def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] =
{
+    val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
+    val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam)
+    val metricParam = metricParamName match {
+      case `longSQLMetricParam` => LongSQLMetricParam
+      case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
+    }
+    metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
   }
 
   /**
    * A metric that its value will be ignored. Use this one when we need a metric parameter
but don't
    * care about the value.
    */
-  val nullLongMetric = new LongSQLMetric("null", new LongSQLMetricParam(_.sum.toString, 0L))
+  val nullLongMetric = new LongSQLMetric("null", LongSQLMetricParam)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index e74d6fb..c74ad40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{Node, Unparsed}
-
-import org.apache.commons.lang3.StringEscapeUtils
+import scala.xml.Node
 
 import org.apache.spark.Logging
 import org.apache.spark.ui.{UIUtils, WebUIPage}

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5a072de..e19a1e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,11 +19,34 @@ package org.apache.spark.sql.execution.ui
 
 import scala.collection.mutable
 
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricValue, SQLMetricParam}
 import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
+import org.apache.spark.ui.SparkUI
+
+@DeveloperApi
+case class SparkListenerSQLExecutionStart(
+    executionId: Long,
+    description: String,
+    details: String,
+    physicalPlanDescription: String,
+    sparkPlanInfo: SparkPlanInfo,
+    time: Long)
+  extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
+  extends SparkListenerEvent
+
+private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
+
+  override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
+    List(new SQLHistoryListener(conf, sparkUI))
+  }
+}
 
 private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
 
@@ -118,7 +141,8 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
   override def onExecutorMetricsUpdate(
       executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
     for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics)
{
-      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, finishTask =
false)
+      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(),
+        finishTask = false)
     }
   }
 
@@ -140,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
       taskEnd.taskInfo.taskId,
       taskEnd.stageId,
       taskEnd.stageAttemptId,
-      taskEnd.taskMetrics,
+      taskEnd.taskMetrics.accumulatorUpdates(),
       finishTask = true)
   }
 
@@ -148,15 +172,12 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
    * Update the accumulator values of a task with the latest metrics for this task. This
is called
    * every time we receive an executor heartbeat or when a task finishes.
    */
-  private def updateTaskAccumulatorValues(
+  protected def updateTaskAccumulatorValues(
       taskId: Long,
       stageId: Int,
       stageAttemptID: Int,
-      metrics: TaskMetrics,
+      accumulatorUpdates: Map[Long, Any],
       finishTask: Boolean): Unit = {
-    if (metrics == null) {
-      return
-    }
 
     _stageIdToStageMetrics.get(stageId) match {
       case Some(stageMetrics) =>
@@ -174,9 +195,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
             case Some(taskMetrics) =>
               if (finishTask) {
                 taskMetrics.finished = true
-                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+                taskMetrics.accumulatorUpdates = accumulatorUpdates
               } else if (!taskMetrics.finished) {
-                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+                taskMetrics.accumulatorUpdates = accumulatorUpdates
               } else {
                 // If a task is finished, we should not override with accumulator updates
from
                 // heartbeat reports
@@ -185,7 +206,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
               // TODO Now just set attemptId to 0. Should fix here when we can get the attempt
               // id from SparkListenerExecutorMetricsUpdate
               stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
-                  attemptId = 0, finished = finishTask, metrics.accumulatorUpdates())
+                  attemptId = 0, finished = finishTask, accumulatorUpdates)
           }
         }
       case None =>
@@ -193,38 +214,40 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
     }
   }
 
-  def onExecutionStart(
-      executionId: Long,
-      description: String,
-      details: String,
-      physicalPlanDescription: String,
-      physicalPlanGraph: SparkPlanGraph,
-      time: Long): Unit = {
-    val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
-      node.metrics.map(metric => metric.accumulatorId -> metric)
-    }
-
-    val executionUIData = new SQLExecutionUIData(executionId, description, details,
-      physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
-    synchronized {
-      activeExecutions(executionId) = executionUIData
-      _executionIdToData(executionId) = executionUIData
-    }
-  }
-
-  def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
-    _executionIdToData.get(executionId).foreach { executionUIData =>
-      executionUIData.completionTime = Some(time)
-      if (!executionUIData.hasRunningJobs) {
-        // onExecutionEnd happens after all "onJobEnd"s
-        // So we should update the execution lists.
-        markExecutionFinished(executionId)
-      } else {
-        // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
-        // Then we don't if the execution is successful, so let the last onJobEnd updates
the
-        // execution lists.
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case SparkListenerSQLExecutionStart(executionId, description, details,
+      physicalPlanDescription, sparkPlanInfo, time) =>
+      val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
+      val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+        node.metrics.map(metric => metric.accumulatorId -> metric)
+      }
+      val executionUIData = new SQLExecutionUIData(
+        executionId,
+        description,
+        details,
+        physicalPlanDescription,
+        physicalPlanGraph,
+        sqlPlanMetrics.toMap,
+        time)
+      synchronized {
+        activeExecutions(executionId) = executionUIData
+        _executionIdToData(executionId) = executionUIData
+      }
+    case SparkListenerSQLExecutionEnd(executionId, time) => synchronized {
+      _executionIdToData.get(executionId).foreach { executionUIData =>
+        executionUIData.completionTime = Some(time)
+        if (!executionUIData.hasRunningJobs) {
+          // onExecutionEnd happens after all "onJobEnd"s
+          // So we should update the execution lists.
+          markExecutionFinished(executionId)
+        } else {
+          // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
+          // Then we don't if the execution is successful, so let the last onJobEnd updates
the
+          // execution lists.
+        }
       }
     }
+    case _ => // Ignore
   }
 
   private def markExecutionFinished(executionId: Long): Unit = {
@@ -289,6 +312,38 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
 
 }
 
+private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
+  extends SQLListener(conf) {
+
+  private var sqlTabAttached = false
+
+  override def onExecutorMetricsUpdate(
+      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
+    // Do nothing
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+    updateTaskAccumulatorValues(
+      taskEnd.taskInfo.taskId,
+      taskEnd.stageId,
+      taskEnd.stageAttemptId,
+      taskEnd.taskInfo.accumulables.map { acc =>
+        (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
+      }.toMap,
+      finishTask = true)
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case _: SparkListenerSQLExecutionStart =>
+      if (!sqlTabAttached) {
+        new SQLTab(this, sparkUI)
+        sqlTabAttached = true
+      }
+      super.onOtherEvent(event)
+    case _ => super.onOtherEvent(event)
+  }
+}
+
 /**
  * Represent all necessary data for an execution that will be used in Web UI.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 9c27944..4f50b2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.sql.execution.ui
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.spark.Logging
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
-  extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
+  extends SparkUITab(sparkUI, "SQL") with Logging {
 
   val parent = sparkUI
 
@@ -35,13 +33,5 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
 }
 
 private[sql] object SQLTab {
-
   private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
-
-  private val nextTabId = new AtomicInteger(0)
-
-  private def nextTabName: String = {
-    val nextId = nextTabId.getAndIncrement()
-    if (nextId == 0) "SQL" else s"SQL$nextId"
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index f1fce54..7af0ff0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * A graph used for storing information of an executionPlan of DataFrame.
@@ -48,27 +48,27 @@ private[sql] object SparkPlanGraph {
   /**
    * Build a SparkPlanGraph from the root of a SparkPlan tree.
    */
-  def apply(plan: SparkPlan): SparkPlanGraph = {
+  def apply(planInfo: SparkPlanInfo): SparkPlanGraph = {
     val nodeIdGenerator = new AtomicLong(0)
     val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
     val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
-    buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
+    buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
     new SparkPlanGraph(nodes, edges)
   }
 
   private def buildSparkPlanGraphNode(
-      plan: SparkPlan,
+      planInfo: SparkPlanInfo,
       nodeIdGenerator: AtomicLong,
       nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
       edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
-    val metrics = plan.metrics.toSeq.map { case (key, metric) =>
-      SQLPlanMetric(metric.name.getOrElse(key), metric.id,
-        metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
+    val metrics = planInfo.metrics.map { metric =>
+      SQLPlanMetric(metric.name, metric.accumulatorId,
+        SQLMetrics.getMetricParam(metric.metricParam))
     }
     val node = SparkPlanGraphNode(
-      nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics)
+      nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
     nodes += node
-    val childrenNodes = plan.children.map(
+    val childrenNodes = planInfo.children.map(
       child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
     for (child <- childrenNodes) {
       edges += SparkPlanGraphEdge(child.id, node.id)

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 5e2b415..ebfa1ea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -26,6 +26,7 @@ import org.apache.xbean.asm5.Opcodes._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.ui.SparkPlanGraph
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -82,7 +83,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
       val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
-      val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node
=>
+      val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
+        df.queryExecution.executedPlan)).nodes.filter { node =>
         expectedMetrics.contains(node.id)
       }.map { node =>
         val nodeMetrics = node.metrics.map { metric =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index c15aac7..f93d081 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -21,10 +21,10 @@ import java.util.Properties
 
 import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.sql.execution.metric.LongSQLMetricValue
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.metric.LongSQLMetricValue
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
@@ -82,7 +82,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val executionId = 0
     val df = createTestDataFrame
     val accumulatorIds =
-      SparkPlanGraph(df.queryExecution.executedPlan).nodes.flatMap(_.metrics.map(_.accumulatorId))
+      SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
+        .nodes.flatMap(_.metrics.map(_.accumulatorId))
     // Assume all accumulators are long
     var accumulatorValue = 0L
     val accumulatorUpdates = accumulatorIds.map { id =>
@@ -90,13 +91,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
       (id, accumulatorValue)
     }.toMap
 
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
 
     val executionUIData = listener.executionIdToData(0)
 
@@ -206,7 +207,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
       time = System.currentTimeMillis(),
       JobSucceeded
     ))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
 
     assert(executionUIData.runningJobs.isEmpty)
     assert(executionUIData.succeededJobs === Seq(0))
@@ -219,19 +221,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
       stageInfos = Nil,
       createProperties(executionId)))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 0,
       time = System.currentTimeMillis(),
@@ -248,13 +251,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
@@ -271,7 +274,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
       time = System.currentTimeMillis(),
       stageInfos = Nil,
       createProperties(executionId)))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 1,
       time = System.currentTimeMillis(),
@@ -288,19 +292,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
       stageInfos = Seq.empty,
       createProperties(executionId)))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 0,
       time = System.currentTimeMillis(),

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 963d10e..e7b3765 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -42,6 +42,7 @@ trait SharedSQLContext extends SQLTestUtils {
    * Initialize the [[TestSQLContext]].
    */
   protected override def beforeAll(): Unit = {
+    SQLContext.clearSqlListener()
     if (_ctx == null) {
       _ctx = new TestSQLContext
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message