spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [2/3] spark git commit: [SPARK-14654][CORE] New accumulator API
Date Thu, 28 Apr 2016 07:26:45 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 9ab7d96..945830c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -375,26 +375,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       execSummary.taskTime += info.duration
       stageData.numActiveTasks -= 1
 
-      val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) =
+      val errorMessage: Option[String] =
         taskEnd.reason match {
           case org.apache.spark.Success =>
             stageData.completedIndices.add(info.index)
             stageData.numCompleteTasks += 1
-            (None, taskEnd.taskMetrics.accumulatorUpdates())
+            None
           case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
             stageData.numFailedTasks += 1
-            (Some(e.toErrorString), e.accumUpdates)
+            Some(e.toErrorString)
           case e: TaskFailedReason => // All other failure cases
             stageData.numFailedTasks += 1
-            (Some(e.toErrorString), Seq.empty[AccumulableInfo])
+            Some(e.toErrorString)
         }
 
-      val taskMetrics =
-        if (accums.nonEmpty) {
-          Some(TaskMetrics.fromAccumulatorUpdates(accums))
-        } else {
-          None
-        }
+      val taskMetrics = Option(taskEnd.taskMetrics)
       taskMetrics.foreach { m =>
         val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics)
         updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
@@ -503,7 +498,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
         new StageUIData
       })
       val taskData = stageData.taskData.get(taskId)
-      val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates)
+      val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
       taskData.foreach { t =>
         if (!t.taskInfo.finished) {
           updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a613fbc..aeab71d 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -840,7 +840,9 @@ private[spark] object JsonProtocol {
         // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
         val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
           .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
-          .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulatorUpdates())
+          .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => {
+            acc.toInfo(Some(acc.localValue), None)
+          }))
         ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
       case `taskResultLost` => TaskResultLost
       case `taskKilled` => TaskKilled

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 6063476..5f97e58 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -28,17 +28,17 @@ import scala.util.control.NonFatal
 import org.scalatest.Matchers
 import org.scalatest.exceptions.TestFailedException
 
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.AccumulatorParam.{ListAccumulatorParam, StringAccumulatorParam}
 import org.apache.spark.scheduler._
 import org.apache.spark.serializer.JavaSerializer
 
 
 class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext {
-  import AccumulatorParam._
+  import AccumulatorSuite.createLongAccum
 
   override def afterEach(): Unit = {
     try {
-      Accumulators.clear()
+      AccumulatorContext.clear()
     } finally {
       super.afterEach()
     }
@@ -59,9 +59,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
       }
     }
 
+  test("accumulator serialization") {
+    val ser = new JavaSerializer(new SparkConf).newInstance()
+    val acc = createLongAccum("x")
+    acc.add(5)
+    assert(acc.value == 5)
+    assert(acc.isAtDriverSide)
+
+    // serialize and de-serialize it, to simulate sending accumulator to executor.
+    val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc))
+    // value is reset on the executors
+    assert(acc2.localValue == 0)
+    assert(!acc2.isAtDriverSide)
+
+    acc2.add(10)
+    // serialize and de-serialize it again, to simulate sending accumulator back to driver.
+    val acc3 = ser.deserialize[LongAccumulator](ser.serialize(acc2))
+    // value is not reset on the driver
+    assert(acc3.value == 10)
+    assert(acc3.isAtDriverSide)
+  }
+
   test ("basic accumulation") {
     sc = new SparkContext("local", "test")
-    val acc : Accumulator[Int] = sc.accumulator(0)
+    val acc: Accumulator[Int] = sc.accumulator(0)
 
     val d = sc.parallelize(1 to 20)
     d.foreach{x => acc += x}
@@ -75,7 +96,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
 
   test("value not assignable from tasks") {
     sc = new SparkContext("local", "test")
-    val acc : Accumulator[Int] = sc.accumulator(0)
+    val acc: Accumulator[Int] = sc.accumulator(0)
 
     val d = sc.parallelize(1 to 20)
     an [Exception] should be thrownBy {d.foreach{x => acc.value = x}}
@@ -169,14 +190,13 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
     System.gc()
     assert(ref.get.isEmpty)
 
-    Accumulators.remove(accId)
-    assert(!Accumulators.originals.get(accId).isDefined)
+    AccumulatorContext.remove(accId)
+    assert(!AccumulatorContext.originals.containsKey(accId))
   }
 
   test("get accum") {
-    sc = new SparkContext("local", "test")
     // Don't register with SparkContext for cleanup
-    var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
+    var acc = createLongAccum("a")
     val accId = acc.id
     val ref = WeakReference(acc)
     assert(ref.get.isDefined)
@@ -188,44 +208,16 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
 
     // Getting a garbage collected accum should throw error
     intercept[IllegalAccessError] {
-      Accumulators.get(accId)
+      AccumulatorContext.get(accId)
     }
 
     // Getting a normal accumulator. Note: this has to be separate because referencing an
     // accumulator above in an `assert` would keep it from being garbage collected.
-    val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true)
-    assert(Accumulators.get(acc2.id) === Some(acc2))
+    val acc2 = createLongAccum("b")
+    assert(AccumulatorContext.get(acc2.id) === Some(acc2))
 
     // Getting an accumulator that does not exist should return None
-    assert(Accumulators.get(100000).isEmpty)
-  }
-
-  test("copy") {
-    val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false)
-    val acc2 = acc1.copy()
-    assert(acc1.id === acc2.id)
-    assert(acc1.value === acc2.value)
-    assert(acc1.name === acc2.name)
-    assert(acc1.countFailedValues === acc2.countFailedValues)
-    assert(acc1 !== acc2)
-    // Modifying one does not affect the other
-    acc1.add(44L)
-    assert(acc1.value === 500L)
-    assert(acc2.value === 456L)
-    acc2.add(144L)
-    assert(acc1.value === 500L)
-    assert(acc2.value === 600L)
-  }
-
-  test("register multiple accums with same ID") {
-    val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
-    // `copy` will create a new Accumulable and register it.
-    val acc2 = acc1.copy()
-    assert(acc1 !== acc2)
-    assert(acc1.id === acc2.id)
-    // The second one does not override the first one
-    assert(Accumulators.originals.size === 1)
-    assert(Accumulators.get(acc1.id) === Some(acc1))
+    assert(AccumulatorContext.get(100000).isEmpty)
   }
 
   test("string accumulator param") {
@@ -257,38 +249,33 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
     acc.setValue(Seq(9, 10))
     assert(acc.value === Seq(9, 10))
   }
-
-  test("value is reset on the executors") {
-    val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"))
-    val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"))
-    val externalAccums = Seq(acc1, acc2)
-    val taskMetrics = new TaskMetrics
-    // Set some values; these should not be observed later on the "executors"
-    acc1.setValue(10)
-    acc2.setValue(20L)
-    taskMetrics.testAccum.get.setValue(30L)
-    // Simulate the task being serialized and sent to the executors.
-    val dummyTask = new DummyTask(taskMetrics, externalAccums)
-    val serInstance = new JavaSerializer(new SparkConf).newInstance()
-    val taskSer = Task.serializeWithDependencies(
-      dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
-    // Now we're on the executors.
-    // Deserialize the task and assert that its accumulators are zero'ed out.
-    val (_, _, _, taskBytes) = Task.deserializeWithDependencies(taskSer)
-    val taskDeser = serInstance.deserialize[DummyTask](
-      taskBytes, Thread.currentThread.getContextClassLoader)
-    // Assert that executors see only zeros
-    taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) }
-    taskDeser.metrics.internalAccums.foreach { a => assert(a.localValue == a.zero) }
-  }
-
 }
 
 private[spark] object AccumulatorSuite {
-
   import InternalAccumulator._
 
   /**
+   * Create a long accumulator and register it to [[AccumulatorContext]].
+   */
+  def createLongAccum(
+      name: String,
+      countFailedValues: Boolean = false,
+      initValue: Long = 0,
+      id: Long = AccumulatorContext.newId()): LongAccumulator = {
+    val acc = new LongAccumulator
+    acc.setValue(initValue)
+    acc.metadata = AccumulatorMetadata(id, Some(name), countFailedValues)
+    AccumulatorContext.register(acc)
+    acc
+  }
+
+  /**
+   * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
+   * info as an accumulator update.
+   */
+  def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None)
+
+  /**
    * Run one or more Spark jobs and verify that in at least one job the peak execution memory
    * accumulator is updated afterwards.
    */
@@ -340,7 +327,6 @@ private class SaveInfoListener extends SparkListener {
     if (jobCompletionCallback != null) {
       jobCompletionSem.acquire()
       if (exception != null) {
-        exception = null
         throw exception
       }
     }
@@ -377,13 +363,3 @@ private class SaveInfoListener extends SparkListener {
       (taskEnd.stageId, taskEnd.stageAttemptId), new ArrayBuffer[TaskInfo]) += taskEnd.taskInfo
   }
 }
-
-
-/**
- * A dummy [[Task]] that contains internal and external [[Accumulator]]s.
- */
-private[spark] class DummyTask(
-    metrics: TaskMetrics,
-    val externalAccums: Seq[Accumulator[_]]) extends Task[Int](0, 0, 0, metrics) {
-  override def runTask(c: TaskContext): Int = 1
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 4d2b3e7..1adc90a 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -211,10 +211,10 @@ class HeartbeatReceiverSuite
   private def triggerHeartbeat(
       executorId: String,
       executorShouldReregister: Boolean): Unit = {
-    val metrics = new TaskMetrics
+    val metrics = TaskMetrics.empty
     val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
     val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
-      Heartbeat(executorId, Array(1L -> metrics.accumulatorUpdates()), blockManagerId))
+      Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId))
     if (executorShouldReregister) {
       assert(response.reregisterBlockManager)
     } else {
@@ -222,7 +222,7 @@ class HeartbeatReceiverSuite
       // Additionally verify that the scheduler callback is called with the correct parameters
       verify(scheduler).executorHeartbeatReceived(
         Matchers.eq(executorId),
-        Matchers.eq(Array(1L -> metrics.accumulatorUpdates())),
+        Matchers.eq(Array(1L -> metrics.accumulators())),
         Matchers.eq(blockManagerId))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index b074b95..e4474bb 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.executor.TaskMetrics
@@ -29,7 +30,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
 
   override def afterEach(): Unit = {
     try {
-      Accumulators.clear()
+      AccumulatorContext.clear()
     } finally {
       super.afterEach()
     }
@@ -37,9 +38,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
 
   test("internal accumulators in TaskContext") {
     val taskContext = TaskContext.empty()
-    val accumUpdates = taskContext.taskMetrics.accumulatorUpdates()
+    val accumUpdates = taskContext.taskMetrics.accumulators()
     assert(accumUpdates.size > 0)
-    assert(accumUpdates.forall(_.internal))
     val testAccum = taskContext.taskMetrics.testAccum.get
     assert(accumUpdates.exists(_.id == testAccum.id))
   }
@@ -51,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
     sc.addSparkListener(listener)
     // Have each task add 1 to the internal accumulator
     val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
-      TaskContext.get().taskMetrics().testAccum.get += 1
+      TaskContext.get().taskMetrics().testAccum.get.add(1)
       iter
     }
     // Register asserts in job completion callback to avoid flakiness
@@ -87,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
     val rdd = sc.parallelize(1 to 100, numPartitions)
       .map { i => (i, i) }
       .mapPartitions { iter =>
-        TaskContext.get().taskMetrics().testAccum.get += 1
+        TaskContext.get().taskMetrics().testAccum.get.add(1)
         iter
       }
       .reduceByKey { case (x, y) => x + y }
       .mapPartitions { iter =>
-        TaskContext.get().taskMetrics().testAccum.get += 10
+        TaskContext.get().taskMetrics().testAccum.get.add(10)
         iter
       }
       .repartition(numPartitions * 2)
       .mapPartitions { iter =>
-        TaskContext.get().taskMetrics().testAccum.get += 100
+        TaskContext.get().taskMetrics().testAccum.get.add(100)
         iter
       }
     // Register asserts in job completion callback to avoid flakiness
@@ -127,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
     // This should retry both stages in the scheduler. Note that we only want to fail the
     // first stage attempt because we want the stage to eventually succeed.
     val x = sc.parallelize(1 to 100, numPartitions)
-      .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter }
+      .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get.add(1); iter }
       .groupBy(identity)
     val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId
     val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
@@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
       private val myCleaner = new SaveAccumContextCleaner(this)
       override def cleaner: Option[ContextCleaner] = Some(myCleaner)
     }
-    assert(Accumulators.originals.isEmpty)
+    assert(AccumulatorContext.originals.isEmpty)
     sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
     val numInternalAccums = TaskMetrics.empty.internalAccums.length
     // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
-    assert(Accumulators.originals.size === numInternalAccums * 2)
+    assert(AccumulatorContext.originals.size === numInternalAccums * 2)
     val accumsRegistered = sc.cleaner match {
       case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
       case _ => Seq.empty[Long]
     }
     // Make sure the same set of accumulators is registered for cleanup
     assert(accumsRegistered.size === numInternalAccums * 2)
-    assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet)
+    assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala)
   }
 
   /**
@@ -212,7 +212,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
   private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
     private val accumsRegistered = new ArrayBuffer[Long]
 
-    override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+    override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
       accumsRegistered += a.id
       super.registerAccumulatorForCleanup(a)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 3228752..4aae2c9 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -34,7 +34,7 @@ private[spark] abstract class SparkFunSuite
   protected override def afterAll(): Unit = {
     try {
       // Avoid leaking map entries in tests that use accumulators without SparkContext
-      Accumulators.clear()
+      AccumulatorContext.clear()
     } finally {
       super.afterAll()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index ee70419..94f6e1a 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -20,14 +20,11 @@ package org.apache.spark.executor
 import org.scalatest.Assertions
 
 import org.apache.spark._
-import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId}
+import org.apache.spark.storage.{BlockStatus, StorageLevel, TestBlockId}
 
 
 class TaskMetricsSuite extends SparkFunSuite {
-  import AccumulatorParam._
   import StorageLevel._
-  import TaskMetricsSuite._
 
   test("mutating values") {
     val tm = new TaskMetrics
@@ -59,8 +56,8 @@ class TaskMetricsSuite extends SparkFunSuite {
     tm.incPeakExecutionMemory(8L)
     val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L))
     val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L))
-    tm.incUpdatedBlockStatuses(Seq(block1))
-    tm.incUpdatedBlockStatuses(Seq(block2))
+    tm.incUpdatedBlockStatuses(block1)
+    tm.incUpdatedBlockStatuses(block2)
     // assert new values exist
     assert(tm.executorDeserializeTime == 1L)
     assert(tm.executorRunTime == 2L)
@@ -194,18 +191,19 @@ class TaskMetricsSuite extends SparkFunSuite {
   }
 
   test("additional accumulables") {
-    val tm = new TaskMetrics
-    val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
-    val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
-    val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
-    val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true)
+    val tm = TaskMetrics.empty
+    val acc1 = AccumulatorSuite.createLongAccum("a")
+    val acc2 = AccumulatorSuite.createLongAccum("b")
+    val acc3 = AccumulatorSuite.createLongAccum("c")
+    val acc4 = AccumulatorSuite.createLongAccum("d", true)
     tm.registerAccumulator(acc1)
     tm.registerAccumulator(acc2)
     tm.registerAccumulator(acc3)
     tm.registerAccumulator(acc4)
-    acc1 += 1
-    acc2 += 2
-    val newUpdates = tm.accumulatorUpdates().map { a => (a.id, a) }.toMap
+    acc1.add(1)
+    acc2.add(2)
+    val newUpdates = tm.accumulators()
+      .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap
     assert(newUpdates.contains(acc1.id))
     assert(newUpdates.contains(acc2.id))
     assert(newUpdates.contains(acc3.id))
@@ -214,46 +212,14 @@ class TaskMetricsSuite extends SparkFunSuite {
     assert(newUpdates(acc2.id).name === Some("b"))
     assert(newUpdates(acc3.id).name === Some("c"))
     assert(newUpdates(acc4.id).name === Some("d"))
-    assert(newUpdates(acc1.id).update === Some(1))
-    assert(newUpdates(acc2.id).update === Some(2))
-    assert(newUpdates(acc3.id).update === Some(0))
-    assert(newUpdates(acc4.id).update === Some(0))
+    assert(newUpdates(acc1.id).value === 1)
+    assert(newUpdates(acc2.id).value === 2)
+    assert(newUpdates(acc3.id).value === 0)
+    assert(newUpdates(acc4.id).value === 0)
     assert(!newUpdates(acc3.id).countFailedValues)
     assert(newUpdates(acc4.id).countFailedValues)
-    assert(newUpdates.values.map(_.update).forall(_.isDefined))
-    assert(newUpdates.values.map(_.value).forall(_.isEmpty))
     assert(newUpdates.size === tm.internalAccums.size + 4)
   }
-
-  test("from accumulator updates") {
-    val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a =>
-      AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues)
-    }
-    val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
-    assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
-    // Test this with additional accumulators to ensure that we do not crash when handling
-    // updates from unregistered accumulators. In practice, all accumulators created
-    // on the driver, internal or not, should be registered with `Accumulators` at some point.
-    val param = IntAccumulatorParam
-    val registeredAccums = Seq(
-      new Accumulator(0, param, Some("a"), countFailedValues = true),
-      new Accumulator(0, param, Some("b"), countFailedValues = false))
-    val unregisteredAccums = Seq(
-      new Accumulator(0, param, Some("c"), countFailedValues = true),
-      new Accumulator(0, param, Some("d"), countFailedValues = false))
-    registeredAccums.foreach(Accumulators.register)
-    registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id)))
-    unregisteredAccums.foreach(a => Accumulators.remove(a.id))
-    unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id)))
-    // set some values in these accums
-    registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
-    unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
-    val registeredAccumInfos = registeredAccums.map(makeInfo)
-    val unregisteredAccumInfos = unregisteredAccums.map(makeInfo)
-    val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos
-    // Simply checking that this does not crash:
-    TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
-  }
 }
 
 
@@ -264,21 +230,14 @@ private[spark] object TaskMetricsSuite extends Assertions {
    * Note: this does NOT check accumulator ID equality.
    */
   def assertUpdatesEquals(
-      updates1: Seq[AccumulableInfo],
-      updates2: Seq[AccumulableInfo]): Unit = {
+      updates1: Seq[NewAccumulator[_, _]],
+      updates2: Seq[NewAccumulator[_, _]]): Unit = {
     assert(updates1.size === updates2.size)
-    updates1.zip(updates2).foreach { case (info1, info2) =>
+    updates1.zip(updates2).foreach { case (acc1, acc2) =>
       // do not assert ID equals here
-      assert(info1.name === info2.name)
-      assert(info1.update === info2.update)
-      assert(info1.value === info2.value)
-      assert(info1.countFailedValues === info2.countFailedValues)
+      assert(acc1.name === acc2.name)
+      assert(acc1.countFailedValues === acc2.countFailedValues)
+      assert(acc1.value == acc2.value)
     }
   }
-
-  /**
-   * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
-   * info as an accumulator update.
-   */
-  def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b76c0a4..9912d1f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     override def stop() = {}
     override def executorHeartbeatReceived(
         execId: String,
-        accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+        accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
         blockManagerId: BlockManagerId): Boolean = true
     override def submitTasks(taskSet: TaskSet) = {
       // normally done by TaskSetManager
@@ -277,8 +277,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
           taskSet.tasks(i),
           result._1,
           result._2,
-          Seq(new AccumulableInfo(
-            accumId, Some(""), Some(1), None, internal = false, countFailedValues = false))))
+          Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accumId))))
       }
     }
   }
@@ -484,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
       override def defaultParallelism(): Int = 2
       override def executorHeartbeatReceived(
           execId: String,
-          accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+          accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
           blockManagerId: BlockManagerId): Boolean = true
       override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
       override def applicationAttemptId(): Option[String] = None
@@ -997,10 +996,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     // complete two tasks
     runEvent(makeCompletionEvent(
       taskSets(0).tasks(0), Success, 42,
-      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
+      Seq.empty, createFakeTaskInfoWithId(0)))
     runEvent(makeCompletionEvent(
       taskSets(0).tasks(1), Success, 42,
-      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
+      Seq.empty, createFakeTaskInfoWithId(1)))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     // verify stage exists
     assert(scheduler.stageIdToStage.contains(0))
@@ -1009,10 +1008,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     // finish other 2 tasks
     runEvent(makeCompletionEvent(
       taskSets(0).tasks(2), Success, 42,
-      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
+      Seq.empty, createFakeTaskInfoWithId(2)))
     runEvent(makeCompletionEvent(
       taskSets(0).tasks(3), Success, 42,
-      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
+      Seq.empty, createFakeTaskInfoWithId(3)))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.endedTasks.size == 4)
 
@@ -1023,14 +1022,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     // with a speculative task and make sure the event is sent out
     runEvent(makeCompletionEvent(
       taskSets(0).tasks(3), Success, 42,
-      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
+      Seq.empty, createFakeTaskInfoWithId(5)))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.endedTasks.size == 5)
 
     // make sure non successful tasks also send out event
     runEvent(makeCompletionEvent(
       taskSets(0).tasks(3), UnknownReason, 42,
-      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
+      Seq.empty, createFakeTaskInfoWithId(6)))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.endedTasks.size == 6)
   }
@@ -1613,37 +1612,43 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
 
   test("accumulator not calculated for resubmitted result stage") {
     // just for register
-    val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
+    val accum = AccumulatorSuite.createLongAccum("a")
     val finalRdd = new MyRDD(sc, 1, Nil)
     submit(finalRdd, Array(0))
     completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
     completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
 
-    val accVal = Accumulators.originals(accum.id).get.get.value
-
-    assert(accVal === 1)
-
+    assert(accum.value === 1)
     assertDataStructuresEmpty()
   }
 
   test("accumulators are updated on exception failures") {
-    val acc1 = sc.accumulator(0L, "ingenieur")
-    val acc2 = sc.accumulator(0L, "boulanger")
-    val acc3 = sc.accumulator(0L, "agriculteur")
-    assert(Accumulators.get(acc1.id).isDefined)
-    assert(Accumulators.get(acc2.id).isDefined)
-    assert(Accumulators.get(acc3.id).isDefined)
-    val accInfo1 = acc1.toInfo(Some(15L), None)
-    val accInfo2 = acc2.toInfo(Some(13L), None)
-    val accInfo3 = acc3.toInfo(Some(18L), None)
-    val accumUpdates = Seq(accInfo1, accInfo2, accInfo3)
-    val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates)
+    val acc1 = AccumulatorSuite.createLongAccum("ingenieur")
+    val acc2 = AccumulatorSuite.createLongAccum("boulanger")
+    val acc3 = AccumulatorSuite.createLongAccum("agriculteur")
+    assert(AccumulatorContext.get(acc1.id).isDefined)
+    assert(AccumulatorContext.get(acc2.id).isDefined)
+    assert(AccumulatorContext.get(acc3.id).isDefined)
+    val accUpdate1 = new LongAccumulator
+    accUpdate1.metadata = acc1.metadata
+    accUpdate1.setValue(15)
+    val accUpdate2 = new LongAccumulator
+    accUpdate2.metadata = acc2.metadata
+    accUpdate2.setValue(13)
+    val accUpdate3 = new LongAccumulator
+    accUpdate3.metadata = acc3.metadata
+    accUpdate3.setValue(18)
+    val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3)
+    val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo)
+    val exceptionFailure = new ExceptionFailure(
+      new SparkException("fondue?"),
+      accumInfo).copy(accums = accumUpdates)
     submit(new MyRDD(sc, 1, Nil), Array(0))
     runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result"))
-    assert(Accumulators.get(acc1.id).get.value === 15L)
-    assert(Accumulators.get(acc2.id).get.value === 13L)
-    assert(Accumulators.get(acc3.id).get.value === 18L)
+    assert(AccumulatorContext.get(acc1.id).get.value === 15L)
+    assert(AccumulatorContext.get(acc2.id).get.value === 13L)
+    assert(AccumulatorContext.get(acc3.id).get.value === 18L)
   }
 
   test("reduce tasks should be placed locally with map output") {
@@ -2007,12 +2012,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
       task: Task[_],
       reason: TaskEndReason,
       result: Any,
-      extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
+      extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty,
       taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
     val accumUpdates = reason match {
-      case Success => task.metrics.accumulatorUpdates()
-      case ef: ExceptionFailure => ef.accumUpdates
-      case _ => Seq.empty[AccumulableInfo]
+      case Success => task.metrics.accumulators()
+      case ef: ExceptionFailure => ef.accums
+      case _ => Seq.empty
     }
     CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 9971d48..16027d9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 
-class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
-{
+class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
   test("launch of backend and scheduler") {
     val conf = new SparkConf().setMaster("myclusterManager").
         setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
@@ -68,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler {
   override def applicationAttemptId(): Option[String] = None
   def executorHeartbeatReceived(
       execId: String,
-      accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+      accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
       blockManagerId: BlockManagerId): Boolean = true
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index d55f6f6..9aca4db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -162,18 +162,17 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
     }.count()
     // The one that counts failed values should be 4x the one that didn't,
     // since we ran each task 4 times
-    assert(Accumulators.get(acc1.id).get.value === 40L)
-    assert(Accumulators.get(acc2.id).get.value === 10L)
+    assert(AccumulatorContext.get(acc1.id).get.value === 40L)
+    assert(AccumulatorContext.get(acc2.id).get.value === 10L)
   }
 
   test("failed tasks collect only accumulators whose values count during failures") {
     sc = new SparkContext("local", "test")
-    val param = AccumulatorParam.LongAccumulatorParam
-    val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
-    val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
+    val acc1 = AccumulatorSuite.createLongAccum("x", true)
+    val acc2 = AccumulatorSuite.createLongAccum("y", false)
     // Create a dummy task. We won't end up running this; we just want to collect
     // accumulator updates from it.
-    val taskMetrics = new TaskMetrics
+    val taskMetrics = TaskMetrics.empty
     val task = new Task[Int](0, 0, 0) {
       context = new TaskContextImpl(0, 0, 0L, 0,
         new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
@@ -186,12 +185,11 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
     }
     // First, simulate task success. This should give us all the accumulators.
     val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
-    val accumUpdates2 = (taskMetrics.internalAccums ++ Seq(acc1, acc2))
-      .map(TaskMetricsSuite.makeInfo)
+    val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2)
     TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
     // Now, simulate task failures. This should give us only the accums that count failed values.
     val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
-    val accumUpdates4 = (taskMetrics.internalAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
+    val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1)
     TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index b5385c1..9e472f9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -241,8 +241,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
     assert(resultGetter.taskResults.size === 1)
     val resBefore = resultGetter.taskResults.head
     val resAfter = captor.getValue
-    val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
-    val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
+    val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value)
+    val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value)
     assert(resSizeBefore.exists(_ == 0L))
     assert(resSizeAfter.exists(_.toString.toLong > 0L))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ecf4b76..339fc42 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
       task: Task[_],
       reason: TaskEndReason,
       result: Any,
-      accumUpdates: Seq[AccumulableInfo],
+      accumUpdates: Seq[NewAccumulator[_, _]],
       taskInfo: TaskInfo) {
     taskScheduler.endedTasks(taskInfo.index) = reason
   }
@@ -166,8 +166,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
-    val accumUpdates =
-      taskSet.tasks.head.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
+    val accumUpdates = taskSet.tasks.head.metrics.internalAccums
 
     // Offer a host with NO_PREF as the constraint,
     // we should get a nopref task immediately since that's what we only have
@@ -185,8 +184,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(3)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
-    val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
-      task.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
+    val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task =>
+      task.metrics.internalAccums
     }
 
     // First three offers should all find tasks
@@ -792,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
 
   private def createTaskResult(
       id: Int,
-      accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]): DirectTaskResult[Int] = {
+      accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = {
     val valueSer = SparkEnv.get.serializer.newInstance()
     new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 2211248..ce7d51d 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -183,7 +183,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
   test("test executor id to summary") {
     val conf = new SparkConf()
     val listener = new JobProgressListener(conf)
-    val taskMetrics = new TaskMetrics()
+    val taskMetrics = TaskMetrics.empty
     val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
     assert(listener.stageIdToData.size === 0)
 
@@ -230,7 +230,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
   test("test task success vs failure counting for different task end reasons") {
     val conf = new SparkConf()
     val listener = new JobProgressListener(conf)
-    val metrics = new TaskMetrics()
+    val metrics = TaskMetrics.empty
     val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     val task = new ShuffleMapTask(0)
@@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
     val execId = "exe-1"
 
     def makeTaskMetrics(base: Int): TaskMetrics = {
-      val taskMetrics = new TaskMetrics
+      val taskMetrics = TaskMetrics.empty
       val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
       val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
       val inputMetrics = taskMetrics.inputMetrics
@@ -300,9 +300,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
     listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
-      (1234L, 0, 0, makeTaskMetrics(0).accumulatorUpdates()),
-      (1235L, 0, 0, makeTaskMetrics(100).accumulatorUpdates()),
-      (1236L, 1, 0, makeTaskMetrics(200).accumulatorUpdates()))))
+      (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)),
+      (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)),
+      (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo)))))
 
     var stage0Data = listener.stageIdToData.get((0, 0)).get
     var stage1Data = listener.stageIdToData.get((1, 0)).get

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d3b6cdf..6fda737 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -85,7 +85,8 @@ class JsonProtocolSuite extends SparkFunSuite {
       // Use custom accum ID for determinism
       val accumUpdates =
         makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
-          .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) }
+          .accumulators().map(AccumulatorSuite.makeInfo)
+          .zipWithIndex.map { case (a, i) => a.copy(id = i) }
       SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
     }
 
@@ -385,7 +386,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     // "Task Metrics" field, if it exists.
     val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
     val tmJson = JsonProtocol.taskMetricsToJson(tm)
-    val accumUpdates = tm.accumulatorUpdates()
+    val accumUpdates = tm.accumulators().map(AccumulatorSuite.makeInfo)
     val exception = new SparkException("sentimental")
     val exceptionFailure = new ExceptionFailure(exception, accumUpdates)
     val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure)
@@ -813,7 +814,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       hasHadoopInput: Boolean,
       hasOutput: Boolean,
       hasRecords: Boolean = true) = {
-    val t = new TaskMetrics
+    val t = TaskMetrics.empty
     t.setExecutorDeserializeTime(a)
     t.setExecutorRunTime(b)
     t.setResultSize(c)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 0f8648f..6fc49a0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -688,6 +688,18 @@ object MimaExcludes {
       ) ++ Seq(
         // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
         ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
+      ) ++ Seq(
+        // SPARK-14654: New accumulator API
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ExceptionFailure$"),
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.apply"),
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.metrics"),
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.copy"),
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.this"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.ShuffleReadMetrics.remoteBlocksFetched"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.ShuffleReadMetrics.totalBlocksFetched"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.ShuffleReadMetrics.localBlocksFetched"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.remoteBlocksFetched"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.localBlocksFetched")
       )
     case v if v.startsWith("1.6") =>
       Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 520ceaa..d6516f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,7 @@ private[sql] case class RDDScanExec(
     override val nodeName: String) extends LeafExecNode {
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
@@ -147,7 +147,7 @@ private[sql] case class RowDataSourceScanExec(
   extends DataSourceScanExec with CodegenSupport {
 
   private[sql] override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   val outputUnsafeRows = relation match {
     case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
@@ -216,7 +216,7 @@ private[sql] case class BatchedDataSourceScanExec(
   extends DataSourceScanExec with CodegenSupport {
 
   private[sql] override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
index 7c47566..c201822 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
@@ -40,7 +40,7 @@ case class ExpandExec(
   extends UnaryExecNode with CodegenSupport {
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   // The GroupExpressions can output data with arbitrary partitioning, so set it
   // as UNKNOWN partitioning

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 10cfec3..934bc38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -56,7 +56,7 @@ case class GenerateExec(
   extends UnaryExecNode {
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def producedAttributes: AttributeSet = AttributeSet(output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 4ab447a..c5e78b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -31,7 +31,7 @@ private[sql] case class LocalTableScanExec(
     rows: Seq[InternalRow]) extends LeafExecNode {
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   private val unsafeRows: Array[InternalRow] = {
     val proj = UnsafeProjection.create(output, output)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 861ff3c..0bbe970 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.util.ThreadUtils
 
@@ -77,7 +77,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   /**
    * Return all metrics containing metrics of this SparkPlan.
    */
-  private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
+  private[sql] def metrics: Map[String, SQLMetric] = Map.empty
 
   /**
    * Reset all the metrics.
@@ -89,8 +89,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   /**
    * Return a LongSQLMetric according to the name.
    */
-  private[sql] def longMetric(name: String): LongSQLMetric =
-    metrics(name).asInstanceOf[LongSQLMetric]
+  private[sql] def longMetric(name: String): SQLMetric = metrics(name)
 
   // TODO: Move to `DistributedPlan`
   /** Specifies how data is partitioned across different nodes in the cluster. */

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index cb4b1cf..f84070a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -55,8 +55,7 @@ private[sql] object SparkPlanInfo {
       case _ => plan.children ++ plan.subqueries
     }
     val metrics = plan.metrics.toSeq.map { case (key, metric) =>
-      new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
-        Utils.getFormattedClassName(metric.param))
+      new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
     }
 
     new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 362d0d7..4849234 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -26,7 +26,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.metric.LongSQLMetric
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.unsafe.Platform
 
 /**
@@ -42,7 +42,7 @@ import org.apache.spark.unsafe.Platform
  */
 private[sql] class UnsafeRowSerializer(
     numFields: Int,
-    dataSize: LongSQLMetric = null) extends Serializer with Serializable {
+    dataSize: SQLMetric = null) extends Serializer with Serializable {
   override def newInstance(): SerializerInstance =
     new UnsafeRowSerializerInstance(numFields, dataSize)
   override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
@@ -50,7 +50,7 @@ private[sql] class UnsafeRowSerializer(
 
 private class UnsafeRowSerializerInstance(
     numFields: Int,
-    dataSize: LongSQLMetric) extends SerializerInstance {
+    dataSize: SQLMetric) extends SerializerInstance {
   /**
    * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
    * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
@@ -60,13 +60,10 @@ private class UnsafeRowSerializerInstance(
     private[this] val dOut: DataOutputStream =
       new DataOutputStream(new BufferedOutputStream(out))
 
-    // LongSQLMetricParam.add() is faster than LongSQLMetric.+=
-    val localDataSize = if (dataSize != null) dataSize.localValue else null
-
     override def writeValue[T: ClassTag](value: T): SerializationStream = {
       val row = value.asInstanceOf[UnsafeRow]
-      if (localDataSize != null) {
-        localDataSize.add(row.getSizeInBytes)
+      if (dataSize != null) {
+        dataSize.add(row.getSizeInBytes)
       }
       dOut.writeInt(row.getSizeInBytes)
       row.writeToStream(dOut, writeBuffer)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 6a03bd0..15b4abe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.toCommentSafeString
 import org.apache.spark.sql.execution.aggregate.TungstenAggregate
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
-import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -52,11 +52,7 @@ trait CodegenSupport extends SparkPlan {
    * @return name of the variable representing the metric
    */
   def metricTerm(ctx: CodegenContext, name: String): String = {
-    val metric = ctx.addReferenceObj(name, longMetric(name))
-    val value = ctx.freshName("metricValue")
-    val cls = classOf[LongSQLMetricValue].getName
-    ctx.addMutableState(cls, value, s"$value = ($cls) $metric.localValue();")
-    value
+    ctx.addReferenceObj(name, longMetric(name))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
index 3169e0a..2e74d59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
@@ -46,7 +46,7 @@ case class SortBasedAggregateExec(
       AttributeSet(aggregateBufferAttributes)
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index c35d781..f392b13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction}
-import org.apache.spark.sql.execution.metric.LongSQLMetric
+import org.apache.spark.sql.execution.metric.SQLMetric
 
 /**
  * An iterator used to evaluate [[AggregateFunction]]. It assumes the input rows have been
@@ -35,7 +35,7 @@ class SortBasedAggregationIterator(
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
     newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection,
-    numOutputRows: LongSQLMetric)
+    numOutputRows: SQLMetric)
   extends AggregationIterator(
     groupingExpressions,
     valueAttributes,

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 16362f7..d0ba37e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
 import org.apache.spark.unsafe.KVIterator
 
@@ -51,7 +51,7 @@ case class TungstenAggregate(
       aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
     "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))
@@ -309,8 +309,8 @@ case class TungstenAggregate(
   def finishAggregate(
       hashMap: UnsafeFixedWidthAggregationMap,
       sorter: UnsafeKVExternalSorter,
-      peakMemory: LongSQLMetricValue,
-      spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = {
+      peakMemory: SQLMetric,
+      spillSize: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {
 
     // update peak execution memory
     val mapMemory = hashMap.getPeakMemoryUsedBytes

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 9db5087..243aa15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
 import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, UnsafeKVExternalSorter}
-import org.apache.spark.sql.execution.metric.LongSQLMetric
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.KVIterator
 
@@ -86,9 +86,9 @@ class TungstenAggregationIterator(
     originalInputAttributes: Seq[Attribute],
     inputIter: Iterator[InternalRow],
     testFallbackStartsAt: Option[(Int, Int)],
-    numOutputRows: LongSQLMetric,
-    peakMemory: LongSQLMetric,
-    spillSize: LongSQLMetric)
+    numOutputRows: SQLMetric,
+    peakMemory: SQLMetric,
+    spillSize: SQLMetric)
   extends AggregationIterator(
     groupingExpressions,
     originalInputAttributes,

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 83f527f..77be613 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -103,7 +103,7 @@ case class FilterExec(condition: Expression, child: SparkPlan)
   }
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     child.asInstanceOf[CodegenSupport].inputRDDs()
@@ -229,7 +229,7 @@ case class SampleExec(
   override def output: Seq[Attribute] = child.output
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
     if (withReplacement) {
@@ -322,7 +322,7 @@ case class RangeExec(
   extends LeafExecNode with CodegenSupport {
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   // output attributes should not affect the results
   override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index cb957b9..577c34b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.{Accumulable, Accumulator, Accumulators}
+import org.apache.spark.{Accumulable, Accumulator, AccumulatorContext}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -204,7 +204,7 @@ private[sql] case class InMemoryRelation(
     Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats)
 
   private[sql] def uncache(blocking: Boolean): Unit = {
-    Accumulators.remove(batchStats.id)
+    AccumulatorContext.remove(batchStats.id)
     cachedColumnBuffers.unpersist(blocking)
     _cachedColumnBuffers = null
   }
@@ -217,7 +217,7 @@ private[sql] case class InMemoryTableScanExec(
   extends LeafExecNode {
 
   private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = attributes
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 573ca19..b6ecd3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -38,10 +38,10 @@ case class BroadcastExchangeExec(
     child: SparkPlan) extends Exchange {
 
   override private[sql] lazy val metrics = Map(
-    "dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"),
-    "collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"),
-    "buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"),
-    "broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)"))
+    "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
+    "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"),
+    "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"),
+    "broadcastTime" -> SQLMetrics.createMetric(sparkContext, "time to broadcast (ms)"))
 
   override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index b0a6b8f..587c603 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -46,7 +46,7 @@ case class BroadcastHashJoinExec(
   extends BinaryExecNode with HashJoin with CodegenSupport {
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 51afa00..a659bf2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -35,7 +35,7 @@ case class BroadcastNestedLoopJoinExec(
     condition: Option[Expression]) extends BinaryExecNode {
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   /** BuildRight means the right relation <=> the broadcast relation. */
   private val (streamed, broadcast) = buildSide match {

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index 67f5919..8d7ecc4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -86,7 +86,7 @@ case class CartesianProductExec(
   override def output: Seq[Attribute] = left.output ++ right.output
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index d6feedc..9c173d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.{RowIterator, SparkPlan}
-import org.apache.spark.sql.execution.metric.LongSQLMetric
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.{IntegralType, LongType}
 
 trait HashJoin {
@@ -201,7 +201,7 @@ trait HashJoin {
   protected def join(
       streamedIter: Iterator[InternalRow],
       hashed: HashedRelation,
-      numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
 
     val joinedIter = joinType match {
       case Inner =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index a242a07..3ef2fec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -40,7 +40,7 @@ case class ShuffledHashJoinExec(
   extends BinaryExecNode with HashJoin {
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
     "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index a4c5491..775f8ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, RowIterator, SparkPlan}
-import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.util.collection.BitSet
 
 /**
@@ -41,7 +41,7 @@ case class SortMergeJoinExec(
     right: SparkPlan) extends BinaryExecNode with CodegenSupport {
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = {
     joinType match {
@@ -734,7 +734,7 @@ private class LeftOuterIterator(
     rightNullRow: InternalRow,
     boundCondition: InternalRow => Boolean,
     resultProj: InternalRow => InternalRow,
-    numOutputRows: LongSQLMetric)
+    numOutputRows: SQLMetric)
   extends OneSideOuterIterator(
     smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows) {
 
@@ -750,7 +750,7 @@ private class RightOuterIterator(
     leftNullRow: InternalRow,
     boundCondition: InternalRow => Boolean,
     resultProj: InternalRow => InternalRow,
-    numOutputRows: LongSQLMetric)
+    numOutputRows: SQLMetric)
   extends OneSideOuterIterator(smjScanner, leftNullRow, boundCondition, resultProj, numOutputRows) {
 
   protected override def setStreamSideOutput(row: InternalRow): Unit = joinedRow.withRight(row)
@@ -778,7 +778,7 @@ private abstract class OneSideOuterIterator(
     bufferedSideNullRow: InternalRow,
     boundCondition: InternalRow => Boolean,
     resultProj: InternalRow => InternalRow,
-    numOutputRows: LongSQLMetric) extends RowIterator {
+    numOutputRows: SQLMetric) extends RowIterator {
 
   // A row to store the joined result, reused many times
   protected[this] val joinedRow: JoinedRow = new JoinedRow()
@@ -1016,7 +1016,7 @@ private class SortMergeFullOuterJoinScanner(
 private class FullOuterIterator(
     smjScanner: SortMergeFullOuterJoinScanner,
     resultProj: InternalRow => InternalRow,
-    numRows: LongSQLMetric) extends RowIterator {
+    numRows: SQLMetric) extends RowIterator {
   private[this] val joinedRow: JoinedRow = smjScanner.getJoinedRow()
 
   override def advanceNext(): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
index 2708219..adb8151 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
@@ -27,4 +27,4 @@ import org.apache.spark.annotation.DeveloperApi
 class SQLMetricInfo(
     val name: String,
     val accumulatorId: Long,
-    val metricParam: String)
+    val metricType: String)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message