spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-15317][CORE] Don't store accumulators for every task in listeners
Date Thu, 19 May 2016 19:05:40 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4f8639f9d -> 62e5158f1


[SPARK-15317][CORE] Don't store accumulators for every task in listeners

## What changes were proposed in this pull request?

In general, the Web UI doesn't need to store the Accumulator/AccumulableInfo for every task.
It only needs the Accumulator values.

In this PR, it creates new UIData classes to store the necessary fields and make `JobProgressListener`
store only these new classes, so that `JobProgressListener` won't store Accumulator/AccumulableInfo
and the size of `JobProgressListener` becomes pretty small. I also eliminates `AccumulableInfo`
from `SQLListener` so that we don't keep any references for those unused `AccumulableInfo`s.

## How was this patch tested?

I ran two tests reported in JIRA locally:

The first one is:
```
val data = spark.range(0, 10000, 1, 10000)
data.cache().count()
```
The retained size of JobProgressListener decreases from 60.7M to 6.9M.

The second one is:
```
import org.apache.spark.ml.CC
import org.apache.spark.sql.SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
CC.runTest(sqlContext)
```

This test won't cause OOM after applying this patch.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13153 from zsxwing/memory.

(cherry picked from commit 4e3cb7a5d965fd490390398ecfe35f1fc05e8511)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62e5158f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62e5158f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62e5158f

Branch: refs/heads/branch-2.0
Commit: 62e5158f167425b9565a2b01fc494e593c57cae3
Parents: 4f8639f
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Thu May 19 12:05:17 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu May 19 12:05:30 2016 -0700

----------------------------------------------------------------------
 .../spark/status/api/v1/AllStagesResource.scala |   2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  12 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   2 +-
 .../scala/org/apache/spark/ui/jobs/UIData.scala | 136 ++++++++++++++++++-
 .../org/apache/spark/util/AccumulatorV2.scala   |   3 +
 .../status/api/v1/AllStagesResourceSuite.scala  |   4 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  29 +++-
 .../spark/sql/execution/metric/SQLMetrics.scala |   8 +-
 .../spark/sql/execution/ui/SQLListener.scala    |  13 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  |   7 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |   2 +-
 11 files changed, 186 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index eddc36e..7d63a8f 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -20,10 +20,10 @@ import java.util.{Arrays, Date, List => JList}
 import javax.ws.rs.{GET, Produces, QueryParam}
 import javax.ws.rs.core.MediaType
 
-import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics
=> InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics
=> InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
 import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
+import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => InternalInputMetrics, OutputMetricsUIData
=> InternalOutputMetrics, ShuffleReadMetricsUIData => InternalShuffleReadMetrics, ShuffleWriteMetricsUIData
=> InternalShuffleWriteMetrics, TaskMetricsUIData => InternalTaskMetrics}
 import org.apache.spark.util.Distribution
 
 @Produces(Array(MediaType.APPLICATION_JSON))

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 945830c..842f42b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -332,7 +332,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with
Logging {
         new StageUIData
       })
       stageData.numActiveTasks += 1
-      stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo, Some(metrics)))
+      stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
     }
     for (
       activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
@@ -395,9 +395,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with
Logging {
         updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
       }
 
-      val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
-      taskData.taskInfo = info
-      taskData.metrics = taskMetrics
+      val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
+      taskData.updateTaskInfo(info)
+      taskData.updateTaskMetrics(taskMetrics)
       taskData.errorMessage = errorMessage
 
       for (
@@ -425,7 +425,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with
Logging {
       stageData: StageUIData,
       execId: String,
       taskMetrics: TaskMetrics,
-      oldMetrics: Option[TaskMetrics]) {
+      oldMetrics: Option[TaskMetricsUIData]) {
     val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)
 
     val shuffleWriteDelta =
@@ -503,7 +503,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with
Logging {
         if (!t.taskInfo.finished) {
           updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
           // Overwrite task metrics
-          t.metrics = Some(metrics)
+          t.updateTaskMetrics(Some(metrics))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 6ddabfd..d986a55 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -768,7 +768,7 @@ private[ui] object StagePage {
   }
 
   private[ui] def getSchedulerDelay(
-      info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
+      info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = {
     if (info.finished) {
       val totalExecutionTime = info.finishTime - info.launchTime
       val executorOverhead = (metrics.executorDeserializeTime +

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index b454ef1..d76a0e6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -21,8 +21,10 @@ import scala.collection.mutable
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
 import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
+import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.util.AccumulatorContext
 import org.apache.spark.util.collection.OpenHashSet
 
 private[spark] object UIData {
@@ -105,13 +107,137 @@ private[spark] object UIData {
   /**
    * These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
    */
-  class TaskUIData(
-      var taskInfo: TaskInfo,
-      var metrics: Option[TaskMetrics] = None,
-      var errorMessage: Option[String] = None)
+  class TaskUIData private(
+      private var _taskInfo: TaskInfo,
+      private var _metrics: Option[TaskMetricsUIData]) {
+
+    var errorMessage: Option[String] = None
+
+    def taskInfo: TaskInfo = _taskInfo
+
+    def metrics: Option[TaskMetricsUIData] = _metrics
+
+    def updateTaskInfo(taskInfo: TaskInfo): Unit = {
+      _taskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
+    }
+
+    def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
+      _metrics = TaskUIData.toTaskMetricsUIData(metrics)
+    }
+  }
+
+  object TaskUIData {
+    def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
+      new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
+    }
+
+    private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData]
= {
+      metrics.map { m =>
+        TaskMetricsUIData(
+          executorDeserializeTime = m.executorDeserializeTime,
+          executorRunTime = m.executorRunTime,
+          resultSize = m.resultSize,
+          jvmGCTime = m.jvmGCTime,
+          resultSerializationTime = m.resultSerializationTime,
+          memoryBytesSpilled = m.memoryBytesSpilled,
+          diskBytesSpilled = m.diskBytesSpilled,
+          peakExecutionMemory = m.peakExecutionMemory,
+          updatedBlockStatuses = m.updatedBlockStatuses.toList,
+          inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
+          outputMetrics =
+            OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
+          shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
+          shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
+      }
+    }
+
+    /**
+     * We don't need to store internal or SQL accumulables as their values will be shown
in other
+     * places, so drop them to reduce the memory usage.
+     */
+    private[spark] def dropInternalAndSQLAccumulables(taskInfo: TaskInfo): TaskInfo = {
+      val newTaskInfo = new TaskInfo(
+        taskId = taskInfo.taskId,
+        index = taskInfo.index,
+        attemptNumber = taskInfo.attemptNumber,
+        launchTime = taskInfo.launchTime,
+        executorId = taskInfo.executorId,
+        host = taskInfo.host,
+        taskLocality = taskInfo.taskLocality,
+        speculative = taskInfo.speculative
+      )
+      newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
+      newTaskInfo.accumulables ++= taskInfo.accumulables.filter {
+        accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
+      }
+      newTaskInfo.finishTime = taskInfo.finishTime
+      newTaskInfo.failed = taskInfo.failed
+      newTaskInfo
+    }
+  }
 
   class ExecutorUIData(
       val startTime: Long,
       var finishTime: Option[Long] = None,
       var finishReason: Option[String] = None)
+
+  case class TaskMetricsUIData(
+      executorDeserializeTime: Long,
+      executorRunTime: Long,
+      resultSize: Long,
+      jvmGCTime: Long,
+      resultSerializationTime: Long,
+      memoryBytesSpilled: Long,
+      diskBytesSpilled: Long,
+      peakExecutionMemory: Long,
+      updatedBlockStatuses: Seq[(BlockId, BlockStatus)],
+      inputMetrics: InputMetricsUIData,
+      outputMetrics: OutputMetricsUIData,
+      shuffleReadMetrics: ShuffleReadMetricsUIData,
+      shuffleWriteMetrics: ShuffleWriteMetricsUIData)
+
+  case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
+
+  case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
+
+  case class ShuffleReadMetricsUIData(
+      remoteBlocksFetched: Long,
+      localBlocksFetched: Long,
+      remoteBytesRead: Long,
+      localBytesRead: Long,
+      fetchWaitTime: Long,
+      recordsRead: Long,
+      totalBytesRead: Long,
+      totalBlocksFetched: Long)
+
+  object ShuffleReadMetricsUIData {
+    def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
+      new ShuffleReadMetricsUIData(
+        remoteBlocksFetched = metrics.remoteBlocksFetched,
+        localBlocksFetched = metrics.localBlocksFetched,
+        remoteBytesRead = metrics.remoteBytesRead,
+        localBytesRead = metrics.localBytesRead,
+        fetchWaitTime = metrics.fetchWaitTime,
+        recordsRead = metrics.recordsRead,
+        totalBytesRead = metrics.totalBytesRead,
+        totalBlocksFetched = metrics.totalBlocksFetched
+      )
+    }
+  }
+
+  case class ShuffleWriteMetricsUIData(
+      bytesWritten: Long,
+      recordsWritten: Long,
+      writeTime: Long)
+
+  object ShuffleWriteMetricsUIData {
+    def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
+      new ShuffleWriteMetricsUIData(
+        bytesWritten = metrics.bytesWritten,
+        recordsWritten = metrics.recordsWritten,
+        writeTime = metrics.writeTime
+      )
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0cf9df0..13cb6a2 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -255,6 +255,9 @@ private[spark] object AccumulatorContext {
   def clear(): Unit = {
     originals.clear()
   }
+
+  // Identifier for distinguishing SQL metrics from other accumulators
+  private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
index d223af1..f684e16 100644
--- a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
@@ -30,8 +30,8 @@ class AllStagesResourceSuite extends SparkFunSuite {
   def getFirstTaskLaunchTime(taskLaunchTimes: Seq[Long]): Option[Date] = {
     val tasks = new HashMap[Long, TaskUIData]
     taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
-      tasks(idx.toLong) = new TaskUIData(
-        new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None, None)
+      tasks(idx.toLong) = TaskUIData(
+        new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
     }
 
     val stageUiData = new StageUIData()

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index ce7d51d..6f7c9f2 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -25,7 +25,8 @@ import org.apache.spark._
 import org.apache.spark.{LocalSparkContext, SparkConf, Success}
 import org.apache.spark.executor._
 import org.apache.spark.scheduler._
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.jobs.UIData.TaskUIData
+import org.apache.spark.util.{AccumulatorContext, Utils}
 
 class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
{
 
@@ -359,4 +360,30 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext
with
     assert(
       stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched
== 402)
   }
+
+  test("drop internal and sql accumulators") {
+    val taskInfo = new TaskInfo(0, 0, 0, 0, "", "", TaskLocality.ANY, false)
+    val internalAccum =
+      AccumulableInfo(id = 1, name = Some("internal"), None, None, internal = true, false)
+    val sqlAccum = AccumulableInfo(
+      id = 2,
+      name = Some("sql"),
+      None,
+      None,
+      internal = false,
+      countFailedValues = false,
+      metadata = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
+    val userAccum = AccumulableInfo(
+      id = 3,
+      name = Some("user"),
+      None,
+      None,
+      internal = false,
+      countFailedValues = false,
+      metadata = None)
+    taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum)
+
+    val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
+    assert(newTaskInfo.accumulables === Seq(userAccum))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index d6de154..e63c7c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -21,7 +21,7 @@ import java.text.NumberFormat
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.util.{AccumulatorV2, Utils}
+import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
 
 
 class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long,
Long] {
@@ -56,15 +56,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends
Accumulato
 
   // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
   private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo
= {
-    new AccumulableInfo(id, name, update, value, true, true, Some(SQLMetrics.ACCUM_IDENTIFIER))
+    new AccumulableInfo(
+      id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
   }
 }
 
 
 private[sql] object SQLMetrics {
-  // Identifier for distinguishing SQL metrics from other accumulators
-  private[sql] val ACCUM_IDENTIFIER = "sql"
-
   private[sql] val SUM_METRIC = "sum"
   private[sql] val SIZE_METRIC = "size"
   private[sql] val TIMING_METRIC = "timing"

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 510a2ee..03b5326 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -26,6 +26,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric._
 import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.AccumulatorContext
 
 @DeveloperApi
 case class SparkListenerSQLExecutionStart(
@@ -177,8 +178,10 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
       taskId: Long,
       stageId: Int,
       stageAttemptID: Int,
-      accumulatorUpdates: Seq[AccumulableInfo],
+      _accumulatorUpdates: Seq[AccumulableInfo],
       finishTask: Boolean): Unit = {
+    val accumulatorUpdates =
+      _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get))
 
     _stageIdToStageMetrics.get(stageId) match {
       case Some(stageMetrics) =>
@@ -290,9 +293,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener
with Loggi
                stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
                taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
                accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
-            assert(accumulatorUpdate.update.isDefined, s"accumulator update from " +
-              s"task did not have a partial value: ${accumulatorUpdate.name}")
-            (accumulatorUpdate.id, accumulatorUpdate.update.get)
+            (accumulatorUpdate._1, accumulatorUpdate._2)
           }
         }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
         mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
@@ -336,7 +337,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
       taskEnd.taskInfo.accumulables.flatMap { a =>
         // Filter out accumulators that are not SQL metrics
         // For now we assume all SQL metrics are Long's that have been JSON serialized as
String's
-        if (a.metadata == Some(SQLMetrics.ACCUM_IDENTIFIER)) {
+        if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) {
           val newValue = a.update.map(_.toString.toLong).getOrElse(0L)
           Some(a.copy(update = Some(newValue)))
         } else {
@@ -418,4 +419,4 @@ private[ui] class SQLStageMetrics(
 private[ui] class SQLTaskMetrics(
     val attemptId: Long, // TODO not used yet
     var finished: Boolean,
-    var accumulatorUpdates: Seq[AccumulableInfo])
+    var accumulatorUpdates: Seq[(Long, Any)])

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 08f596f..7a89b48 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -29,9 +29,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.ui.SparkPlanGraph
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils}
 
 
 class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
@@ -308,7 +307,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       case Some(v) => fail(s"metric value was not a Long: ${v.getClass.getName}")
       case _ => fail("metric update is missing")
     }
-    assert(metricInfo.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
+    assert(metricInfo.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
     // After serializing to JSON, the original value type is lost, but we can still
     // identify that it's a SQL metric from the metadata
     val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo)
@@ -318,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}")
       case _ => fail("deserialized metric update is missing")
     }
-    assert(metricInfoDeser.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
+    assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/62e5158f/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 9eff42a..1c46713 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -383,7 +383,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     }
     // Listener tracks only SQL metrics, not other accumulators
     assert(trackedAccums.size === 1)
-    assert(trackedAccums.head === sqlMetricInfo)
+    assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get))
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message