spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject [1/4] spark git commit: [SPARK-12895][SPARK-12896] Migrate TaskMetrics to accumulators
Date Wed, 27 Jan 2016 19:15:54 GMT
Repository: spark
Updated Branches:
  refs/heads/master edd473751 -> 87abcf7df


http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index e5ca2de..57021d1 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -22,6 +22,10 @@ import java.util.Properties
 import scala.collection.Map
 
 import org.json4s.jackson.JsonMethods._
+import org.json4s.JsonAST.{JArray, JInt, JString, JValue}
+import org.json4s.JsonDSL._
+import org.scalatest.Assertions
+import org.scalatest.exceptions.TestFailedException
 
 import org.apache.spark._
 import org.apache.spark.executor._
@@ -32,12 +36,7 @@ import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage._
 
 class JsonProtocolSuite extends SparkFunSuite {
-
-  val jobSubmissionTime = 1421191042750L
-  val jobCompletionTime = 1421191296660L
-
-  val executorAddedTime = 1421458410000L
-  val executorRemovedTime = 1421458922000L
+  import JsonProtocolSuite._
 
   test("SparkListenerEvent") {
     val stageSubmitted =
@@ -82,9 +81,13 @@ class JsonProtocolSuite extends SparkFunSuite {
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
       new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
     val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
-    val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq(
-      (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
-        hasHadoopInput = true, hasOutput = true))))
+    val executorMetricsUpdate = {
+      // Use custom accum ID for determinism
+      val accumUpdates =
+        makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
+          .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) }
+      SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
+    }
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -142,7 +145,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       "Some exception")
     val fetchMetadataFailed = new MetadataFetchFailedException(17,
       19, "metadata Fetch failed exception").toTaskEndReason
-    val exceptionFailure = new ExceptionFailure(exception, None)
+    val exceptionFailure = new ExceptionFailure(exception, Seq.empty[AccumulableInfo])
     testTaskEndReason(Success)
     testTaskEndReason(Resubmitted)
     testTaskEndReason(fetchFailed)
@@ -166,9 +169,8 @@ class JsonProtocolSuite extends SparkFunSuite {
    |  Backward compatibility tests  |
    * ============================== */
 
-  test("ExceptionFailure backward compatibility") {
-    val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null,
-      None, None)
+  test("ExceptionFailure backward compatibility: full stack trace") {
+    val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None)
     val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
       .removeField({ _._1 == "Full Stack Trace" })
     assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
@@ -273,14 +275,13 @@ class JsonProtocolSuite extends SparkFunSuite {
     assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
-  test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") {
-    // Metrics about local shuffle bytes read and local read time were added in 1.3.1.
+  test("ShuffleReadMetrics: Local bytes read backwards compatibility") {
+    // Metrics about local shuffle bytes read were added in 1.3.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
       hasHadoopInput = false, hasOutput = false, hasRecords = false)
     assert(metrics.shuffleReadMetrics.nonEmpty)
     val newJson = JsonProtocol.taskMetricsToJson(metrics)
     val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
-      .removeField { case (field, _) => field == "Local Read Time" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
     assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
   }
@@ -371,22 +372,76 @@ class JsonProtocolSuite extends SparkFunSuite {
   }
 
   test("AccumulableInfo backward compatibility") {
-    // "Internal" property of AccumulableInfo were added after 1.5.1.
-    val accumulableInfo = makeAccumulableInfo(1)
+    // "Internal" property of AccumulableInfo was added in 1.5.1
+    val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true)
     val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
       .removeField({ _._1 == "Internal" })
     val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
-    assert(false === oldInfo.internal)
+    assert(!oldInfo.internal)
+    // "Count Failed Values" property of AccumulableInfo was added in 2.0.0
+    val oldJson2 = JsonProtocol.accumulableInfoToJson(accumulableInfo)
+      .removeField({ _._1 == "Count Failed Values" })
+    val oldInfo2 = JsonProtocol.accumulableInfoFromJson(oldJson2)
+    assert(!oldInfo2.countFailedValues)
+  }
+
+  test("ExceptionFailure backward compatibility: accumulator updates") {
+    // "Task Metrics" was replaced with "Accumulator Updates" in 2.0.0. For older event logs,
+    // we should still be able to fallback to constructing the accumulator updates from the
+    // "Task Metrics" field, if it exists.
+    val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
+    val tmJson = JsonProtocol.taskMetricsToJson(tm)
+    val accumUpdates = tm.accumulatorUpdates()
+    val exception = new SparkException("sentimental")
+    val exceptionFailure = new ExceptionFailure(exception, accumUpdates)
+    val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure)
+    val tmFieldJson: JValue = "Task Metrics" -> tmJson
+    val oldExceptionFailureJson: JValue =
+      exceptionFailureJson.removeField { _._1 == "Accumulator Updates" }.merge(tmFieldJson)
+    val oldExceptionFailure =
+      JsonProtocol.taskEndReasonFromJson(oldExceptionFailureJson).asInstanceOf[ExceptionFailure]
+    assert(exceptionFailure.className === oldExceptionFailure.className)
+    assert(exceptionFailure.description === oldExceptionFailure.description)
+    assertSeqEquals[StackTraceElement](
+      exceptionFailure.stackTrace, oldExceptionFailure.stackTrace, assertStackTraceElementEquals)
+    assert(exceptionFailure.fullStackTrace === oldExceptionFailure.fullStackTrace)
+    assertSeqEquals[AccumulableInfo](
+      exceptionFailure.accumUpdates, oldExceptionFailure.accumUpdates, (x, y) => x == y)
   }
 
-  /** -------------------------- *
-   | Helper test running methods |
-   * --------------------------- */
+  test("AccumulableInfo value de/serialization") {
+    import InternalAccumulator._
+    val blocks = Seq[(BlockId, BlockStatus)](
+      (TestBlockId("meebo"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
+      (TestBlockId("feebo"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
+    val blocksJson = JArray(blocks.toList.map { case (id, status) =>
+      ("Block ID" -> id.toString) ~
+      ("Status" -> JsonProtocol.blockStatusToJson(status))
+    })
+    testAccumValue(Some(RESULT_SIZE), 3L, JInt(3))
+    testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2))
+    testAccumValue(Some(input.READ_METHOD), "aka", JString("aka"))
+    testAccumValue(Some(UPDATED_BLOCK_STATUSES), blocks, blocksJson)
+    // For anything else, we just cast the value to a string
+    testAccumValue(Some("anything"), blocks, JString(blocks.toString))
+    testAccumValue(Some("anything"), 123, JString("123"))
+  }
+
+}
+
+
+private[spark] object JsonProtocolSuite extends Assertions {
+  import InternalAccumulator._
+
+  private val jobSubmissionTime = 1421191042750L
+  private val jobCompletionTime = 1421191296660L
+  private val executorAddedTime = 1421458410000L
+  private val executorRemovedTime = 1421458922000L
 
   private def testEvent(event: SparkListenerEvent, jsonString: String) {
     val actualJsonString = compact(render(JsonProtocol.sparkEventToJson(event)))
     val newEvent = JsonProtocol.sparkEventFromJson(parse(actualJsonString))
-    assertJsonStringEquals(jsonString, actualJsonString)
+    assertJsonStringEquals(jsonString, actualJsonString, event.getClass.getSimpleName)
     assertEquals(event, newEvent)
   }
 
@@ -440,11 +495,19 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(info, newInfo)
   }
 
+  private def testAccumValue(name: Option[String], value: Any, expectedJson: JValue): Unit = {
+    val json = JsonProtocol.accumValueToJson(name, value)
+    assert(json === expectedJson)
+    val newValue = JsonProtocol.accumValueFromJson(name, json)
+    val expectedValue = if (name.exists(_.startsWith(METRICS_PREFIX))) value else value.toString
+    assert(newValue === expectedValue)
+  }
+
   /** -------------------------------- *
    | Util methods for comparing events |
-   * --------------------------------- */
+    * --------------------------------- */
 
-  private def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
+  private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
     (event1, event2) match {
       case (e1: SparkListenerStageSubmitted, e2: SparkListenerStageSubmitted) =>
         assert(e1.properties === e2.properties)
@@ -478,14 +541,17 @@ class JsonProtocolSuite extends SparkFunSuite {
         assert(e1.executorId === e1.executorId)
       case (e1: SparkListenerExecutorMetricsUpdate, e2: SparkListenerExecutorMetricsUpdate) =>
         assert(e1.execId === e2.execId)
-        assertSeqEquals[(Long, Int, Int, TaskMetrics)](e1.taskMetrics, e2.taskMetrics, (a, b) => {
-          val (taskId1, stageId1, stageAttemptId1, metrics1) = a
-          val (taskId2, stageId2, stageAttemptId2, metrics2) = b
-          assert(taskId1 === taskId2)
-          assert(stageId1 === stageId2)
-          assert(stageAttemptId1 === stageAttemptId2)
-          assertEquals(metrics1, metrics2)
-        })
+        assertSeqEquals[(Long, Int, Int, Seq[AccumulableInfo])](
+          e1.accumUpdates,
+          e2.accumUpdates,
+          (a, b) => {
+            val (taskId1, stageId1, stageAttemptId1, updates1) = a
+            val (taskId2, stageId2, stageAttemptId2, updates2) = b
+            assert(taskId1 === taskId2)
+            assert(stageId1 === stageId2)
+            assert(stageAttemptId1 === stageAttemptId2)
+            assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b))
+          })
       case (e1, e2) =>
         assert(e1 === e2)
       case _ => fail("Events don't match in types!")
@@ -544,7 +610,6 @@ class JsonProtocolSuite extends SparkFunSuite {
   }
 
   private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
-    assert(metrics1.hostname === metrics2.hostname)
     assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime)
     assert(metrics1.resultSize === metrics2.resultSize)
     assert(metrics1.jvmGCTime === metrics2.jvmGCTime)
@@ -601,7 +666,7 @@ class JsonProtocolSuite extends SparkFunSuite {
         assert(r1.description === r2.description)
         assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
         assert(r1.fullStackTrace === r2.fullStackTrace)
-        assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
+        assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, b) => a.equals(b))
       case (TaskResultLost, TaskResultLost) =>
       case (TaskKilled, TaskKilled) =>
       case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
@@ -637,10 +702,16 @@ class JsonProtocolSuite extends SparkFunSuite {
       assertStackTraceElementEquals)
   }
 
-  private def assertJsonStringEquals(json1: String, json2: String) {
+  private def assertJsonStringEquals(expected: String, actual: String, metadata: String) {
     val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
-    assert(formatJsonString(json1) === formatJsonString(json2),
-      s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}")
+    if (formatJsonString(expected) != formatJsonString(actual)) {
+      // scalastyle:off
+      // This prints something useful if the JSON strings don't match
+      println("=== EXPECTED ===\n" + pretty(parse(expected)) + "\n")
+      println("=== ACTUAL ===\n" + pretty(parse(actual)) + "\n")
+      // scalastyle:on
+      throw new TestFailedException(s"$metadata JSON did not equal", 1)
+    }
   }
 
   private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit) {
@@ -699,7 +770,7 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   /** ----------------------------------- *
    | Util methods for constructing events |
-   * ------------------------------------ */
+    * ------------------------------------ */
 
   private val properties = {
     val p = new Properties
@@ -746,8 +817,12 @@ class JsonProtocolSuite extends SparkFunSuite {
     taskInfo
   }
 
-  private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo =
-    AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal)
+  private def makeAccumulableInfo(
+      id: Int,
+      internal: Boolean = false,
+      countFailedValues: Boolean = false): AccumulableInfo =
+    new AccumulableInfo(id, Some(s"Accumulable$id"), Some(s"delta$id"), Some(s"val$id"),
+      internal, countFailedValues)
 
   /**
    * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
@@ -764,7 +839,6 @@ class JsonProtocolSuite extends SparkFunSuite {
       hasOutput: Boolean,
       hasRecords: Boolean = true) = {
     val t = new TaskMetrics
-    t.setHostname("localhost")
     t.setExecutorDeserializeTime(a)
     t.setExecutorRunTime(b)
     t.setResultSize(c)
@@ -774,7 +848,7 @@ class JsonProtocolSuite extends SparkFunSuite {
 
     if (hasHadoopInput) {
       val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
-      inputMetrics.incBytesRead(d + e + f)
+      inputMetrics.setBytesRead(d + e + f)
       inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
     } else {
       val sr = t.registerTempShuffleReadMetrics()
@@ -794,7 +868,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       val sw = t.registerShuffleWriteMetrics()
       sw.incBytesWritten(a + b + c)
       sw.incWriteTime(b + c + d)
-      sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
+      sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
     }
     // Make at most 6 blocks
     t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i =>
@@ -826,14 +900,16 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
       |        "Value": "val2",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
       |        "Value": "val1",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      }
       |    ]
       |  },
@@ -881,14 +957,16 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
       |        "Value": "val2",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
       |        "Value": "val1",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      }
       |    ]
       |  }
@@ -919,21 +997,24 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
       |        "Value": "val1",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
       |        "Value": "val2",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
       |        "Value": "val3",
-      |        "Internal": true
+      |        "Internal": true,
+      |        "Count Failed Values": false
       |      }
       |    ]
       |  }
@@ -962,21 +1043,24 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
       |        "Value": "val1",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
       |        "Value": "val2",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
       |        "Value": "val3",
-      |        "Internal": true
+      |        "Internal": true,
+      |        "Count Failed Values": false
       |      }
       |    ]
       |  }
@@ -1011,26 +1095,28 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
       |        "Value": "val1",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
       |        "Value": "val2",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
       |        "Value": "val3",
-      |        "Internal": true
+      |        "Internal": true,
+      |        "Count Failed Values": false
       |      }
       |    ]
       |  },
       |  "Task Metrics": {
-      |    "Host Name": "localhost",
       |    "Executor Deserialize Time": 300,
       |    "Executor Run Time": 400,
       |    "Result Size": 500,
@@ -1044,7 +1130,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |      "Fetch Wait Time": 900,
       |      "Remote Bytes Read": 1000,
       |      "Local Bytes Read": 1100,
-      |      "Total Records Read" : 10
+      |      "Total Records Read": 10
       |    },
       |    "Shuffle Write Metrics": {
       |      "Shuffle Bytes Written": 1200,
@@ -1098,26 +1184,28 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
       |        "Value": "val1",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
       |        "Value": "val2",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
       |        "Value": "val3",
-      |        "Internal": true
+      |        "Internal": true,
+      |        "Count Failed Values": false
       |      }
       |    ]
       |  },
       |  "Task Metrics": {
-      |    "Host Name": "localhost",
       |    "Executor Deserialize Time": 300,
       |    "Executor Run Time": 400,
       |    "Result Size": 500,
@@ -1182,26 +1270,28 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
       |        "Value": "val1",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
       |        "Value": "val2",
-      |        "Internal": false
+      |        "Internal": false,
+      |        "Count Failed Values": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
       |        "Value": "val3",
-      |        "Internal": true
+      |        "Internal": true,
+      |        "Count Failed Values": false
       |      }
       |    ]
       |  },
       |  "Task Metrics": {
-      |    "Host Name": "localhost",
       |    "Executor Deserialize Time": 300,
       |    "Executor Run Time": 400,
       |    "Result Size": 500,
@@ -1273,17 +1363,19 @@ class JsonProtocolSuite extends SparkFunSuite {
       |      "Accumulables": [
       |        {
       |          "ID": 2,
-      |          "Name": " Accumulable 2",
+      |          "Name": "Accumulable2",
       |          "Update": "delta2",
       |          "Value": "val2",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        },
       |        {
       |          "ID": 1,
-      |          "Name": " Accumulable 1",
+      |          "Name": "Accumulable1",
       |          "Update": "delta1",
       |          "Value": "val1",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        }
       |      ]
       |    },
@@ -1331,17 +1423,19 @@ class JsonProtocolSuite extends SparkFunSuite {
       |      "Accumulables": [
       |        {
       |          "ID": 2,
-      |          "Name": " Accumulable 2",
+      |          "Name": "Accumulable2",
       |          "Update": "delta2",
       |          "Value": "val2",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        },
       |        {
       |          "ID": 1,
-      |          "Name": " Accumulable 1",
+      |          "Name": "Accumulable1",
       |          "Update": "delta1",
       |          "Value": "val1",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        }
       |      ]
       |    },
@@ -1405,17 +1499,19 @@ class JsonProtocolSuite extends SparkFunSuite {
       |      "Accumulables": [
       |        {
       |          "ID": 2,
-      |          "Name": " Accumulable 2",
+      |          "Name": "Accumulable2",
       |          "Update": "delta2",
       |          "Value": "val2",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        },
       |        {
       |          "ID": 1,
-      |          "Name": " Accumulable 1",
+      |          "Name": "Accumulable1",
       |          "Update": "delta1",
       |          "Value": "val1",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        }
       |      ]
       |    },
@@ -1495,17 +1591,19 @@ class JsonProtocolSuite extends SparkFunSuite {
       |      "Accumulables": [
       |        {
       |          "ID": 2,
-      |          "Name": " Accumulable 2",
+      |          "Name": "Accumulable2",
       |          "Update": "delta2",
       |          "Value": "val2",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        },
       |        {
       |          "ID": 1,
-      |          "Name": " Accumulable 1",
+      |          "Name": "Accumulable1",
       |          "Update": "delta1",
       |          "Value": "val1",
-      |          "Internal": false
+      |          "Internal": false,
+      |          "Count Failed Values": false
       |        }
       |      ]
       |    }
@@ -1657,51 +1755,208 @@ class JsonProtocolSuite extends SparkFunSuite {
     """
 
   private val executorMetricsUpdateJsonString =
-  s"""
-     |{
-     |  "Event": "SparkListenerExecutorMetricsUpdate",
-     |  "Executor ID": "exec3",
-     |  "Metrics Updated": [
-     |  {
-     |    "Task ID": 1,
-     |    "Stage ID": 2,
-     |    "Stage Attempt ID": 3,
-     |    "Task Metrics": {
-     |    "Host Name": "localhost",
-     |    "Executor Deserialize Time": 300,
-     |    "Executor Run Time": 400,
-     |    "Result Size": 500,
-     |    "JVM GC Time": 600,
-     |    "Result Serialization Time": 700,
-     |    "Memory Bytes Spilled": 800,
-     |    "Disk Bytes Spilled": 0,
-     |    "Input Metrics": {
-     |      "Data Read Method": "Hadoop",
-     |      "Bytes Read": 2100,
-     |      "Records Read": 21
-     |    },
-     |    "Output Metrics": {
-     |      "Data Write Method": "Hadoop",
-     |      "Bytes Written": 1200,
-     |      "Records Written": 12
-     |    },
-     |    "Updated Blocks": [
-     |      {
-     |        "Block ID": "rdd_0_0",
-     |        "Status": {
-     |          "Storage Level": {
-     |            "Use Disk": true,
-     |            "Use Memory": true,
-     |            "Deserialized": false,
-     |            "Replication": 2
-     |          },
-     |          "Memory Size": 0,
-     |          "Disk Size": 0
-     |        }
-     |      }
-     |    ]
-     |  }
-     |  }]
-     |}
-   """.stripMargin
+    s"""
+      |{
+      |  "Event": "SparkListenerExecutorMetricsUpdate",
+      |  "Executor ID": "exec3",
+      |  "Metrics Updated": [
+      |    {
+      |      "Task ID": 1,
+      |      "Stage ID": 2,
+      |      "Stage Attempt ID": 3,
+      |      "Accumulator Updates": [
+      |        {
+      |          "ID": 0,
+      |          "Name": "$EXECUTOR_DESERIALIZE_TIME",
+      |          "Update": 300,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 1,
+      |          "Name": "$EXECUTOR_RUN_TIME",
+      |          "Update": 400,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 2,
+      |          "Name": "$RESULT_SIZE",
+      |          "Update": 500,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 3,
+      |          "Name": "$JVM_GC_TIME",
+      |          "Update": 600,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 4,
+      |          "Name": "$RESULT_SERIALIZATION_TIME",
+      |          "Update": 700,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 5,
+      |          "Name": "$MEMORY_BYTES_SPILLED",
+      |          "Update": 800,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 6,
+      |          "Name": "$DISK_BYTES_SPILLED",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 7,
+      |          "Name": "$PEAK_EXECUTION_MEMORY",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 8,
+      |          "Name": "$UPDATED_BLOCK_STATUSES",
+      |          "Update": [
+      |            {
+      |              "BlockID": "rdd_0_0",
+      |              "Status": {
+      |                "StorageLevel": {
+      |                  "UseDisk": true,
+      |                  "UseMemory": true,
+      |                  "Deserialized": false,
+      |                  "Replication": 2
+      |                },
+      |                "MemorySize": 0,
+      |                "DiskSize": 0
+      |              }
+      |            }
+      |          ],
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 9,
+      |          "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 10,
+      |          "Name": "${shuffleRead.LOCAL_BLOCKS_FETCHED}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 11,
+      |          "Name": "${shuffleRead.REMOTE_BYTES_READ}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 12,
+      |          "Name": "${shuffleRead.LOCAL_BYTES_READ}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 13,
+      |          "Name": "${shuffleRead.FETCH_WAIT_TIME}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 14,
+      |          "Name": "${shuffleRead.RECORDS_READ}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 15,
+      |          "Name": "${shuffleWrite.BYTES_WRITTEN}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 16,
+      |          "Name": "${shuffleWrite.RECORDS_WRITTEN}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 17,
+      |          "Name": "${shuffleWrite.WRITE_TIME}",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 18,
+      |          "Name": "${input.READ_METHOD}",
+      |          "Update": "Hadoop",
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 19,
+      |          "Name": "${input.BYTES_READ}",
+      |          "Update": 2100,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 20,
+      |          "Name": "${input.RECORDS_READ}",
+      |          "Update": 21,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 21,
+      |          "Name": "${output.WRITE_METHOD}",
+      |          "Update": "Hadoop",
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 22,
+      |          "Name": "${output.BYTES_WRITTEN}",
+      |          "Update": 1200,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 23,
+      |          "Name": "${output.RECORDS_WRITTEN}",
+      |          "Update": 12,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        },
+      |        {
+      |          "ID": 24,
+      |          "Name": "$TEST_ACCUM",
+      |          "Update": 0,
+      |          "Internal": true,
+      |          "Count Failed Values": true
+      |        }
+      |      ]
+      |    }
+      |  ]
+      |}
+    """.stripMargin
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fc7dc21..968a290 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -176,6 +176,15 @@ object MimaExcludes {
         // SPARK-12510 Refactor ActorReceiver to support Java
         ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver")
       ) ++ Seq(
+        // SPARK-12895 Implement TaskMetrics using accumulators
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.internalMetricsToAccumulators"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.collectInternalAccumulators"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.collectAccumulators")
+      ) ++ Seq(
+        // SPARK-12896 Send only accumulator updates to driver, not TaskMetrics
+        ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this")
+      ) ++ Seq(
         // SPARK-12692 Scala style: Fix the style violation (Space before "," or ":")
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"),
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log__="),

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 73dc8cb..75cb6d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -79,17 +79,17 @@ case class Sort(
         sorter.setTestSpillFrequency(testSpillFrequency)
       }
 
+      val metrics = TaskContext.get().taskMetrics()
       // Remember spill data size of this task before execute this operator so that we can
       // figure out how many bytes we spilled for this operator.
-      val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled
+      val spillSizeBefore = metrics.memoryBytesSpilled
 
       val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 
       dataSize += sorter.getPeakMemoryUsage
-      spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore
+      spillSize += metrics.memoryBytesSpilled - spillSizeBefore
+      metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
 
-      TaskContext.get().internalMetricsToAccumulators(
-        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage)
       sortedIterator
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 41799c5..001e9c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -418,10 +418,10 @@ class TungstenAggregationIterator(
         val mapMemory = hashMap.getPeakMemoryUsedBytes
         val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
         val peakMemory = Math.max(mapMemory, sorterMemory)
+        val metrics = TaskContext.get().taskMetrics()
         dataSize += peakMemory
-        spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore
-        TaskContext.get().internalMetricsToAccumulators(
-          InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory)
+        spillSize += metrics.memoryBytesSpilled - spillSizeBefore
+        metrics.incPeakExecutionMemory(peakMemory)
       }
       numOutputRows += 1
       res

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index 8222b84..edd87c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -136,14 +136,17 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
 
       // Find a function that will return the FileSystem bytes read by this thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read some bytes
-      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
-        split.serializableHadoopSplit.value match {
-          case _: FileSplit | _: CombineFileSplit =>
-            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
-          case _ => None
+      val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
+        case _: FileSplit | _: CombineFileSplit =>
+              SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+        case _ => None
+      }
+
+      def updateBytesRead(): Unit = {
+        getBytesReadCallback.foreach { getBytesRead =>
+          inputMetrics.setBytesRead(getBytesRead())
         }
       }
-      inputMetrics.setBytesReadCallback(bytesReadCallback)
 
       val format = inputFormatClass.newInstance
       format match {
@@ -208,6 +211,9 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
         if (!finished) {
           inputMetrics.incRecordsRead(1)
         }
+        if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+          updateBytesRead()
+        }
         reader.getCurrentValue
       }
 
@@ -228,8 +234,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
           } finally {
             reader = null
           }
-          if (bytesReadCallback.isDefined) {
-            inputMetrics.updateBytesRead()
+          if (getBytesReadCallback.isDefined) {
+            updateBytesRead()
           } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
             split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
             // If we can't get the bytes read from the FS stats, fall back to the split size,

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index c9ea579..0464071 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -111,8 +111,7 @@ case class BroadcastHashJoin(
       val hashedRelation = broadcastRelation.value
       hashedRelation match {
         case unsafe: UnsafeHashedRelation =>
-          TaskContext.get().internalMetricsToAccumulators(
-            InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+          TaskContext.get().taskMetrics().incPeakExecutionMemory(unsafe.getUnsafeSize)
         case _ =>
       }
       hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index 6c7fa2e..db8edd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -119,8 +119,7 @@ case class BroadcastHashOuterJoin(
 
       hashTable match {
         case unsafe: UnsafeHashedRelation =>
-          TaskContext.get().internalMetricsToAccumulators(
-            InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+          TaskContext.get().taskMetrics().incPeakExecutionMemory(unsafe.getUnsafeSize)
         case _ =>
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 004407b..8929dc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -66,8 +66,7 @@ case class BroadcastLeftSemiJoinHash(
         val hashedRelation = broadcastedRelation.value
         hashedRelation match {
           case unsafe: UnsafeHashedRelation =>
-            TaskContext.get().internalMetricsToAccumulators(
-              InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+            TaskContext.get().taskMetrics().incPeakExecutionMemory(unsafe.getUnsafeSize)
           case _ =>
         }
         hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 52735c9..950dc78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.metric
 
-import org.apache.spark.{Accumulable, AccumulableParam, SparkContext}
+import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext}
 import org.apache.spark.util.Utils
 
 /**
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
  */
 private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T](
     name: String, val param: SQLMetricParam[R, T])
-  extends Accumulable[R, T](param.zero, param, Some(name), true) {
+  extends Accumulable[R, T](param.zero, param, Some(name), internal = true) {
 
   def reset(): Unit = {
     this.value = param.zero
@@ -131,6 +131,8 @@ private[sql] object SQLMetrics {
       name: String,
       param: LongSQLMetricParam): LongSQLMetric = {
     val acc = new LongSQLMetric(name, param)
+    // This is an internal accumulator so we need to register it explicitly.
+    Accumulators.register(acc)
     sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 83c64f7..544606f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -139,9 +139,8 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
 
   override def onExecutorMetricsUpdate(
       executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
-    for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) {
-      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(),
-        finishTask = false)
+    for ((taskId, stageId, stageAttemptID, accumUpdates) <- executorMetricsUpdate.accumUpdates) {
+      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, accumUpdates, finishTask = false)
     }
   }
 
@@ -177,7 +176,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
       taskId: Long,
       stageId: Int,
       stageAttemptID: Int,
-      accumulatorUpdates: Map[Long, Any],
+      accumulatorUpdates: Seq[AccumulableInfo],
       finishTask: Boolean): Unit = {
 
     _stageIdToStageMetrics.get(stageId) match {
@@ -289,8 +288,10 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
           for (stageId <- executionUIData.stages;
                stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
                taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
-               accumulatorUpdate <- taskMetrics.accumulatorUpdates.toSeq) yield {
-            accumulatorUpdate
+               accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
+            assert(accumulatorUpdate.update.isDefined, s"accumulator update from " +
+              s"task did not have a partial value: ${accumulatorUpdate.name}")
+            (accumulatorUpdate.id, accumulatorUpdate.update.get)
           }
         }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
         mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
@@ -328,9 +329,10 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
       taskEnd.taskInfo.taskId,
       taskEnd.stageId,
       taskEnd.stageAttemptId,
-      taskEnd.taskInfo.accumulables.map { acc =>
-        (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
-      }.toMap,
+      taskEnd.taskInfo.accumulables.map { a =>
+        val newValue = new LongSQLMetricValue(a.update.map(_.asInstanceOf[Long]).getOrElse(0L))
+        a.copy(update = Some(newValue))
+      },
       finishTask = true)
   }
 
@@ -406,4 +408,4 @@ private[ui] class SQLStageMetrics(
 private[ui] class SQLTaskMetrics(
     val attemptId: Long, // TODO not used yet
     var finished: Boolean,
-    var accumulatorUpdates: Map[Long, Any])
+    var accumulatorUpdates: Seq[AccumulableInfo])

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 4730896..10ccd4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1648,7 +1648,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
 
   test("external sorting updates peak execution memory") {
     AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
-      sortTest()
+      sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
index 9575d26..273937f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -49,8 +49,7 @@ case class ReferenceSort(
       val context = TaskContext.get()
       context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
       context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
-      context.internalMetricsToAccumulators(
-        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
+      context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
       CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
     }, preservesPartitioning = true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 9c258cb..c7df8b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -71,8 +71,7 @@ class UnsafeFixedWidthAggregationMapSuite
         taskAttemptId = Random.nextInt(10000),
         attemptNumber = 0,
         taskMemoryManager = taskMemoryManager,
-        metricsSystem = null,
-        internalAccumulators = Seq.empty))
+        metricsSystem = null))
 
       try {
         f

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 8a95359..e03bd6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -117,8 +117,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
       taskAttemptId = 98456,
       attemptNumber = 0,
       taskMemoryManager = taskMemMgr,
-      metricsSystem = null,
-      internalAccumulators = Seq.empty))
+      metricsSystem = null))
 
     val sorter = new UnsafeKVExternalSorter(
       keySchema, valueSchema, SparkEnv.get.blockManager, pageSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index 647a7e9..86c2c25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -17,12 +17,19 @@
 
 package org.apache.spark.sql.execution.columnar
 
+import org.scalatest.BeforeAndAfterEach
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
 
-class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
+
+class PartitionBatchPruningSuite
+  extends SparkFunSuite
+  with BeforeAndAfterEach
+  with SharedSQLContext {
+
   import testImplicits._
 
   private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize
@@ -32,30 +39,41 @@ class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
     super.beforeAll()
     // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch
     sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
-
-    val pruningData = sparkContext.makeRDD((1 to 100).map { key =>
-      val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
-      TestData(key, string)
-    }, 5).toDF()
-    pruningData.registerTempTable("pruningData")
-
     // Enable in-memory partition pruning
     sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
     // Enable in-memory table scan accumulators
     sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
-    sqlContext.cacheTable("pruningData")
   }
 
   override protected def afterAll(): Unit = {
     try {
       sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
       sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
-      sqlContext.uncacheTable("pruningData")
     } finally {
       super.afterAll()
     }
   }
 
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    // This creates accumulators, which get cleaned up after every single test,
+    // so we need to do this before every test.
+    val pruningData = sparkContext.makeRDD((1 to 100).map { key =>
+      val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
+      TestData(key, string)
+    }, 5).toDF()
+    pruningData.registerTempTable("pruningData")
+    sqlContext.cacheTable("pruningData")
+  }
+
+  override protected def afterEach(): Unit = {
+    try {
+      sqlContext.uncacheTable("pruningData")
+    } finally {
+      super.afterEach()
+    }
+  }
+
   // Comparisons
   checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1))
   checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1))

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 81a159d..2c408c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.ui
 
 import java.util.Properties
 
+import org.mockito.Mockito.{mock, when}
+
 import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
@@ -67,9 +69,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   )
 
   private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): TaskMetrics = {
-    val metrics = new TaskMetrics
-    metrics.setAccumulatorsUpdater(() => accumulatorUpdates.mapValues(new LongSQLMetricValue(_)))
-    metrics.updateAccumulators()
+    val metrics = mock(classOf[TaskMetrics])
+    when(metrics.accumulatorUpdates()).thenReturn(accumulatorUpdates.map { case (id, update) =>
+      new AccumulableInfo(id, Some(""), Some(new LongSQLMetricValue(update)),
+        value = None, internal = true, countFailedValues = true)
+    }.toSeq)
     metrics
   }
 
@@ -114,17 +118,17 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     assert(listener.getExecutionMetrics(0).isEmpty)
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, metrics)
-      (0L, 0, 0, createTaskMetrics(accumulatorUpdates)),
-      (1L, 0, 0, createTaskMetrics(accumulatorUpdates))
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+      (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates())
     )))
 
     checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, metrics)
-      (0L, 0, 0, createTaskMetrics(accumulatorUpdates)),
-      (1L, 0, 0, createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)))
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+      (1L, 0, 0, createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulatorUpdates())
     )))
 
     checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
@@ -133,9 +137,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, metrics)
-      (0L, 0, 1, createTaskMetrics(accumulatorUpdates)),
-      (1L, 0, 1, createTaskMetrics(accumulatorUpdates))
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+      (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulatorUpdates())
     )))
 
     checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
@@ -173,9 +177,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, metrics)
-      (0L, 1, 0, createTaskMetrics(accumulatorUpdates)),
-      (1L, 1, 0, createTaskMetrics(accumulatorUpdates))
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+      (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates())
     )))
 
     checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7))

http://git-wip-us.apache.org/repos/asf/spark/blob/87abcf7d/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index b46b0d2..9a24a24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -140,7 +140,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
         .filter(_._2.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
 
       assert(peakMemoryAccumulator.size == 1)
-      peakMemoryAccumulator.head._2.value.toLong
+      peakMemoryAccumulator.head._2.value.get.asInstanceOf[Long]
     }
 
     assert(sparkListener.getCompletedStageInfos.length == 2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message