spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [1/8] git commit: Track task value serialisation time in TaskMetrics.
Date Fri, 20 Dec 2013 02:12:45 GMT
Updated Branches:
  refs/heads/master 7990c5637 -> eca68d442


Track task value serialisation time in TaskMetrics.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8a397a95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8a397a95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8a397a95

Branch: refs/heads/master
Commit: 8a397a959bf0b68f7d10fa57665225e0c2b5d03a
Parents: a51f340
Author: Tor Myklebust <tmyklebu@csclub.uwaterloo.ca>
Authored: Mon Dec 16 12:07:39 2013 -0500
Committer: Tor Myklebust <tmyklebu@csclub.uwaterloo.ca>
Committed: Mon Dec 16 12:07:39 2013 -0500

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    | 12 ++++++++----
 .../org/apache/spark/executor/TaskMetrics.scala |  5 +++++
 .../org/apache/spark/scheduler/TaskResult.scala | 20 ++++++++++----------
 .../cluster/ClusterTaskSetManagerSuite.scala    |  4 +++-
 4 files changed, 26 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8a397a95/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0b0a60e..02ad64d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -222,18 +222,22 @@ private[spark] class Executor(
           return
         }
 
+        val objectSer = SparkEnv.get.serializer.newInstance()
+        val beforeSerialization = System.currentTimeMillis()
+        val valueBytes = objectSer.serialize(value)
+        val afterSerialization = System.currentTimeMillis()
+
         for (m <- task.metrics) {
           m.hostname = Utils.localHostName()
           m.executorDeserializeTime = (taskStart - startTime).toInt
           m.executorRunTime = (taskFinish - taskStart).toInt
           m.jvmGCTime = gcTime - startGCTime
+          m.serializationTime = (afterSerialization - beforeSerialization).toInt
         }
-        // TODO I'd also like to track the time it takes to serialize the task results, but
that is
-        // huge headache, b/c we need to serialize the task metrics first.  If TaskMetrics
had a
-        // custom serialized format, we could just change the relevants bytes in the byte
buffer
+
         val accumUpdates = Accumulators.values
 
-        val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
+        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
         val serializedDirectResult = ser.serialize(directResult)
         logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
         val serializedResult = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8a397a95/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index c0ce46e..c036866 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -44,6 +44,11 @@ class TaskMetrics extends Serializable {
   var jvmGCTime: Long = _
 
   /**
+   * Amount of time spent serializing the result of the task
+   */
+  var serializationTime: Long = _
+
+  /**
    * If this task reads from shuffle output, metrics on getting shuffle data will be collected
here
    */
   var shuffleReadMetrics: Option[ShuffleReadMetrics] = None

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8a397a95/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 7e468d0..4e00bc8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T]
with Se
 
 /** A TaskResult that contains the task's return value and accumulator updates. */
 private[spark]
-class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var
metrics: TaskMetrics)
   extends TaskResult[T] with Externalizable {
 
-  def this() = this(null.asInstanceOf[T], null, null)
+  def this() = this(null.asInstanceOf[ByteBuffer], null, null)
 
   override def writeExternal(out: ObjectOutput) {
 
-    val objectSer = SparkEnv.get.serializer.newInstance()
-    val bb = objectSer.serialize(value)
-
-    out.writeInt(bb.remaining())
-    Utils.writeByteBuffer(bb, out)
+    out.writeInt(valueBytes.remaining);
+    Utils.writeByteBuffer(valueBytes, out)
 
     out.writeInt(accumUpdates.size)
     for ((key, value) <- accumUpdates) {
@@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any],
var me
 
   override def readExternal(in: ObjectInput) {
 
-    val objectSer = SparkEnv.get.serializer.newInstance()
-
     val blen = in.readInt()
     val byteVal = new Array[Byte](blen)
     in.readFully(byteVal)
-    value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+    valueBytes = ByteBuffer.wrap(byteVal)
 
     val numUpdates = in.readInt
     if (numUpdates == 0) {
@@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any],
var me
     }
     metrics = in.readObject().asInstanceOf[TaskMetrics]
   }
+
+  def value(): T = {
+    val objectSer = SparkEnv.get.serializer.newInstance()
+    return objectSer.deserialize(valueBytes)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8a397a95/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index b97f2b1..788cbb8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -313,6 +313,8 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext
with Lo
   }
 
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
-    new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+    val objectSer = SparkEnv.get.serializer.newInstance()
+    new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty,
+        new TaskMetrics)
   }
 }


Mime
View raw message