spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-2317] Improve task logging.
Date Wed, 16 Jul 2014 18:50:53 GMT
Repository: spark
Updated Branches:
  refs/heads/master caa163f08 -> 7c8d12322


[SPARK-2317] Improve task logging.

We use TID to indicate task logging. However, TID itself does not capture stage or retries,
making it harder to correlate with the application itself. This pull request changes all logging
messages for tasks to include both the TID and the stage id, stage attempt, task id, and task
attempt.  I've consulted various people but unfortunately this is a really hard task.

Driver log looks like:

```
14/06/28 18:53:29 INFO DAGScheduler: Submitting 10 missing tasks from Stage 0 (MappedRDD[1]
at map at <console>:13)
14/06/28 18:53:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
14/06/28 18:53:29 INFO TaskSetManager: Re-computing pending task lists.
14/07/15 19:44:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, localhost, PROCESS_LOCAL,
1855 bytes)
14/07/15 19:44:40 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL,
1855 bytes)
14/07/15 19:44:40 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL,
1855 bytes)
14/07/15 19:44:40 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL,
1855 bytes)
14/07/15 19:44:40 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 4, localhost, PROCESS_LOCAL,
1855 bytes)
14/07/15 19:44:40 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 5, localhost, PROCESS_LOCAL,
1855 bytes)
14/07/15 19:44:40 INFO TaskSetManager: Starting task 6.0 in stage 1.0 (TID 6, localhost, PROCESS_LOCAL,
1855 bytes)
...
14/07/15 19:44:40 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 64 ms on
localhost (4/10)
14/07/15 19:44:40 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID 4) in 63 ms on
localhost (5/10)
14/07/15 19:44:40 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 2) in 63 ms on
localhost (6/10)
14/07/15 19:44:40 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID 7) in 62 ms on
localhost (7/10)
14/07/15 19:44:40 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 6) in 63 ms on
localhost (8/10)
14/07/15 19:44:40 INFO TaskSetManager: Finished task 9.0 in stage 1.0 (TID 9) in 8 ms on localhost
(9/10)
14/07/15 19:44:40 INFO TaskSetManager: Finished task 8.0 in stage 1.0 (TID 8) in 9 ms on localhost
(10/10)

```

Executor log looks like
```
14/07/15 19:44:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 0)
14/07/15 19:44:40 INFO Executor: Running task 3.0 in stage 1.0 (TID 3)
14/07/15 19:44:40 INFO Executor: Running task 1.0 in stage 1.0 (TID 1)
14/07/15 19:44:40 INFO Executor: Running task 4.0 in stage 1.0 (TID 4)
14/07/15 19:44:40 INFO Executor: Running task 2.0 in stage 1.0 (TID 2)
14/07/15 19:44:40 INFO Executor: Running task 5.0 in stage 1.0 (TID 5)
14/07/15 19:44:40 INFO Executor: Running task 6.0 in stage 1.0 (TID 6)
14/07/15 19:44:40 INFO Executor: Running task 7.0 in stage 1.0 (TID 7)
14/07/15 19:44:40 INFO Executor: Finished task 3.0 in stage 1.0 (TID 3). 847 bytes result
sent to driver
14/07/15 19:44:40 INFO Executor: Finished task 2.0 in stage 1.0 (TID 2). 847 bytes result
sent to driver
14/07/15 19:44:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0). 847 bytes result
sent to driver
14/07/15 19:44:40 INFO Executor: Finished task 1.0 in stage 1.0 (TID 1). 847 bytes result
sent to driver
14/07/15 19:44:40 INFO Executor: Finished task 5.0 in stage 1.0 (TID 5). 847 bytes result
sent to driver
14/07/15 19:44:40 INFO Executor: Finished task 4.0 in stage 1.0 (TID 4). 847 bytes result
sent to driver
14/07/15 19:44:40 INFO Executor: Finished task 6.0 in stage 1.0 (TID 6). 847 bytes result
sent to driver
14/07/15 19:44:40 INFO Executor: Finished task 7.0 in stage 1.0 (TID 7). 847 bytes result
sent to driver
```

Author: Reynold Xin <rxin@apache.org>

Closes #1259 from rxin/betterTaskLogging and squashes the following commits:

c28ada1 [Reynold Xin] Fix unit test failure.
987d043 [Reynold Xin] Updated log messages.
c6cfd46 [Reynold Xin] Merge branch 'master' into betterTaskLogging
b7b1bcc [Reynold Xin] Fixed a typo.
f9aba3c [Reynold Xin] Made it compile.
f8a5c06 [Reynold Xin] Merge branch 'master' into betterTaskLogging
07264e6 [Reynold Xin] Defensive check against unknown TaskEndReason.
76bbd18 [Reynold Xin] FailureSuite not serializable reporting.
4659b20 [Reynold Xin] Remove unused variable.
53888e3 [Reynold Xin] [SPARK-2317] Improve task logging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c8d1232
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c8d1232
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c8d1232

Branch: refs/heads/master
Commit: 7c8d123225bbdcc605642099b107c2d843e87340
Parents: caa163f
Author: Reynold Xin <rxin@apache.org>
Authored: Wed Jul 16 11:50:49 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Wed Jul 16 11:50:49 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala  |  5 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  2 +-
 .../org/apache/spark/executor/Executor.scala    | 43 ++++++----
 .../spark/executor/MesosExecutorBackend.scala   |  2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  1 -
 .../org/apache/spark/scheduler/TaskInfo.scala   |  2 +
 .../apache/spark/scheduler/TaskSetManager.scala | 90 +++++++++-----------
 .../spark/scheduler/local/LocalBackend.scala    |  2 +-
 .../scala/org/apache/spark/FailureSuite.scala   |  5 +-
 .../scala/org/apache/spark/ShuffleSuite.scala   |  2 +-
 10 files changed, 78 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index df42d67..8d5c456 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -89,8 +89,9 @@ case class ExceptionFailure(
     metrics: Option[TaskMetrics])
   extends TaskFailedReason {
   override def toErrorString: String = {
-    val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
-    s"$className ($description}\n$stackTraceString"
+    val stackTraceString =
+      if (stackTrace == null) "null" else stackTrace.map("        " + _).mkString("\n")
+    s"$className ($description)\n$stackTraceString"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8d31bd0..b455c9f 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -71,7 +71,7 @@ private[spark] class CoarseGrainedExecutorBackend(
         val ser = SparkEnv.get.closureSerializer.newInstance()
         val taskDesc = ser.deserialize[TaskDescription](data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
-        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+        executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)
       }
 
     case KillTask(taskId, _, interruptThread) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4d3ba11..b16133b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -107,8 +107,9 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
-  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
-    val tr = new TaskRunner(context, taskId, serializedTask)
+  def launchTask(
+      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer)
{
+    val tr = new TaskRunner(context, taskId, taskName, serializedTask)
     runningTasks.put(taskId, tr)
     threadPool.execute(tr)
   }
@@ -135,14 +136,15 @@ private[spark] class Executor(
     localDirs
   }
 
-  class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
+  class TaskRunner(
+      execBackend: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer)
     extends Runnable {
 
     @volatile private var killed = false
     @volatile private var task: Task[Any] = _
 
     def kill(interruptThread: Boolean) {
-      logInfo("Executor is trying to kill task " + taskId)
+      logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
       killed = true
       if (task != null) {
         task.kill(interruptThread)
@@ -154,7 +156,7 @@ private[spark] class Executor(
       SparkEnv.set(env)
       Thread.currentThread.setContextClassLoader(replClassLoader)
       val ser = SparkEnv.get.closureSerializer.newInstance()
-      logInfo("Running task ID " + taskId)
+      logInfo(s"Running $taskName (TID $taskId)")
       execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
       var attemptedTask: Option[Task[Any]] = None
       var taskStart: Long = 0
@@ -207,25 +209,30 @@ private[spark] class Executor(
 
         val accumUpdates = Accumulators.values
 
-        val directResult = new DirectTaskResult(valueBytes, accumUpdates,
-          task.metrics.getOrElse(null))
+        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
         val serializedDirectResult = ser.serialize(directResult)
-        logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
-        val serializedResult = {
-          if (serializedDirectResult.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes)
{
-            logInfo("Storing result for " + taskId + " in local BlockManager")
+        val resultSize = serializedDirectResult.limit
+
+        // directSend = sending directly back to the driver
+        val (serializedResult, directSend) = {
+          if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
-            ser.serialize(new IndirectTaskResult[Any](blockId))
+            (ser.serialize(new IndirectTaskResult[Any](blockId)), false)
           } else {
-            logInfo("Sending result for " + taskId + " directly to driver")
-            serializedDirectResult
+            (serializedDirectResult, true)
           }
         }
 
         execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
-        logInfo("Finished task ID " + taskId)
+
+        if (directSend) {
+          logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
+        } else {
+          logInfo(
+            s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
+        }
       } catch {
         case ffe: FetchFailedException => {
           val reason = ffe.toTaskEndReason
@@ -233,7 +240,7 @@ private[spark] class Executor(
         }
 
         case _: TaskKilledException | _: InterruptedException if task.killed => {
-          logInfo("Executor killed task " + taskId)
+          logInfo(s"Executor killed $taskName (TID $taskId)")
           execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
         }
 
@@ -241,7 +248,7 @@ private[spark] class Executor(
           // Attempt to exit cleanly by informing the driver of our failure.
           // If anything goes wrong (or this was a fatal exception), we will delegate to
           // the default uncaught exception handler, which will terminate the Executor.
-          logError("Exception in task ID " + taskId, t)
+          logError(s"Exception in $taskName (TID $taskId)", t)
 
           val serviceTime = System.currentTimeMillis() - taskStart
           val metrics = attemptedTask.flatMap(t => t.metrics)
@@ -249,7 +256,7 @@ private[spark] class Executor(
             m.executorRunTime = serviceTime
             m.jvmGCTime = gcTime - startGCTime
           }
-          val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace,
metrics)
+          val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace,
metrics)
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
 
           // Don't forcibly exit unless the exception was inherently fatal, to avoid

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 2232e62..a42c8b4 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -64,7 +64,7 @@ private[spark] class MesosExecutorBackend
     if (executor == null) {
       logError("Received launchTask but executor was null")
     } else {
-      executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+      executor.launchTask(this, taskId, taskInfo.getName, taskInfo.getData.asReadOnlyByteBuffer)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f72bfde..ede3c7d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -816,7 +816,6 @@ class DAGScheduler(
     }
     event.reason match {
       case Success =>
-        logInfo("Completed " + task)
         if (event.accumUpdates != null) {
           // TODO: fail the stage if the accumulator update fails...
           Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 29de045..ca0595f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -84,6 +84,8 @@ class TaskInfo(
     }
   }
 
+  def id: String = s"$index.$attempt"
+
   def duration: Long = {
     if (!finished) {
       throw new UnsupportedOperationException("duration() called on unfinished task")

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3bdc71d..8b5e8cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -26,8 +26,7 @@ import scala.collection.mutable.HashSet
 import scala.math.max
 import scala.math.min
 
-import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted,
-  SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
+import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.{Clock, SystemClock}
@@ -52,8 +51,8 @@ private[spark] class TaskSetManager(
     val taskSet: TaskSet,
     val maxTaskFailures: Int,
     clock: Clock = SystemClock)
-  extends Schedulable with Logging
-{
+  extends Schedulable with Logging {
+
   val conf = sched.sc.conf
 
   /*
@@ -403,14 +402,11 @@ private[spark] class TaskSetManager(
           // Found a task; do some bookkeeping and return a task description
           val task = tasks(index)
           val taskId = sched.newTaskId()
-          // Figure out whether this should count as a preferred launch
-          logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
-            taskSet.id, index, taskId, execId, host, taskLocality))
           // Do various bookkeeping
           copiesRunning(index) += 1
           val attemptNum = taskAttempts(index).size
-          val info = new TaskInfo(
-            taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
+          val info = new TaskInfo(taskId, index, attemptNum, curTime,
+            execId, host, taskLocality, speculative)
           taskInfos(taskId) = info
           taskAttempts(index) = info :: taskAttempts(index)
           // Update our locality level for delay scheduling
@@ -429,11 +425,15 @@ private[spark] class TaskSetManager(
               s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is
" +
               s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
           }
-          val timeTaken = clock.getTime() - startTime
           addRunningTask(taskId)
-          logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
-            taskSet.id, index, serializedTask.limit, timeTaken))
-          val taskName = "task %s:%d".format(taskSet.id, index)
+
+          // We used to log the time it takes to serialize the task, but task size is already
+          // a good proxy to task serialization time.
+          // val timeTaken = clock.getTime() - startTime
+          val taskName = s"task ${info.id} in stage ${taskSet.id}"
+          logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
+              taskName, taskId, host, taskLocality, serializedTask.limit))
+
           sched.dagScheduler.taskStarted(task, info)
           return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
         }
@@ -492,19 +492,19 @@ private[spark] class TaskSetManager(
     info.markSuccessful()
     removeRunningTask(tid)
     sched.dagScheduler.taskEnded(
-      tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+      tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
     if (!successful(index)) {
       tasksSuccessful += 1
-      logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
-        tid, info.duration, info.host, tasksSuccessful, numTasks))
+      logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
+        info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
       // Mark successful and stop if all the tasks have succeeded.
       successful(index) = true
       if (tasksSuccessful == numTasks) {
         isZombie = true
       }
     } else {
-      logInfo("Ignorning task-finished event for TID " + tid + " because task " +
-        index + " has already completed successfully")
+      logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
+        " because task " + index + " has already completed successfully")
     }
     failedExecutors.remove(index)
     maybeFinishTaskSet()
@@ -523,14 +523,13 @@ private[spark] class TaskSetManager(
     info.markFailed()
     val index = info.index
     copiesRunning(index) -= 1
-    if (!isZombie) {
-      logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
-    }
     var taskMetrics : TaskMetrics = null
-    var failureReason: String = null
+
+    val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}):
" +
+      reason.asInstanceOf[TaskFailedReason].toErrorString
     reason match {
       case fetchFailed: FetchFailed =>
-        logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
+        logWarning(failureReason)
         if (!successful(index)) {
           successful(index) = true
           tasksSuccessful += 1
@@ -538,23 +537,17 @@ private[spark] class TaskSetManager(
         // Not adding to failed executors for FetchFailed.
         isZombie = true
 
-      case TaskKilled =>
-        // Not adding to failed executors for TaskKilled.
-        logWarning("Task %d was killed.".format(tid))
-
       case ef: ExceptionFailure =>
-        taskMetrics = ef.metrics.getOrElse(null)
-        if (ef.className == classOf[NotSerializableException].getName()) {
+        taskMetrics = ef.metrics.orNull
+        if (ef.className == classOf[NotSerializableException].getName) {
           // If the task result wasn't serializable, there's no point in trying to re-execute
it.
-          logError("Task %s:%s had a not serializable result: %s; not retrying".format(
-            taskSet.id, index, ef.description))
-          abort("Task %s:%s had a not serializable result: %s".format(
-            taskSet.id, index, ef.description))
+          logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying"
+            .format(info.id, taskSet.id, tid, ef.description))
+          abort("Task %s in stage %s (TID %d) had a not serializable result: %s".format(
+            info.id, taskSet.id, tid, ef.description))
           return
         }
         val key = ef.description
-        failureReason = "Exception failure in TID %s on host %s: %s\n%s".format(
-          tid, info.host, ef.description, ef.stackTrace.map("        " + _).mkString("\n"))
         val now = clock.getTime()
         val (printFull, dupCount) = {
           if (recentExceptions.contains(key)) {
@@ -572,19 +565,18 @@ private[spark] class TaskSetManager(
           }
         }
         if (printFull) {
-          val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
-          logWarning("Loss was due to %s\n%s\n%s".format(
-            ef.className, ef.description, locs.mkString("\n")))
+          logWarning(failureReason)
         } else {
-          logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+          logInfo(
+            s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on executor ${info.host}:
" +
+            s"${ef.className} (${ef.description}) [duplicate $dupCount]")
         }
 
-      case TaskResultLost =>
-        failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
+      case e: TaskFailedReason =>  // TaskResultLost, TaskKilled, and others
         logWarning(failureReason)
 
-      case _ =>
-        failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
+      case e: TaskEndReason =>
+        logError("Unknown TaskEndReason: " + e)
     }
     // always add to failed executors
     failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
@@ -595,10 +587,10 @@ private[spark] class TaskSetManager(
       assert (null != failureReason)
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {
-        logError("Task %s:%d failed %d times; aborting job".format(
-          taskSet.id, index, maxTaskFailures))
-        abort("Task %s:%d failed %d times, most recent failure: %s\nDriver stacktrace:".format(
-          taskSet.id, index, maxTaskFailures, failureReason))
+        logError("Task %d in stage %s failed %d times; aborting job".format(
+          index, taskSet.id, maxTaskFailures))
+        abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
+          .format(index, taskSet.id, maxTaskFailures, failureReason))
         return
       }
     }
@@ -711,8 +703,8 @@ private[spark] class TaskSetManager(
         if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time)
> threshold &&
           !speculatableTasks.contains(index)) {
           logInfo(
-            "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
-              taskSet.id, index, info.host, threshold))
+            "Marking task %d in stage %s (on %s) as speculatable because it ran more than
%.0f ms"
+              .format(index, taskSet.id, info.host, threshold))
           speculatableTasks += index
           foundTasks = true
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 9b95ccc..e9f6273 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -69,7 +69,7 @@ private[spark] class LocalActor(
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
     for (task <- scheduler.resourceOffers(offers).flatten) {
       freeCores -= 1
-      executor.launchTask(executorBackend, task.taskId, task.serializedTask)
+      executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index e755d2e..2229e6a 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -104,8 +104,9 @@ class FailureSuite extends FunSuite with LocalSparkContext {
       results.collect()
     }
     assert(thrown.getClass === classOf[SparkException])
-    assert(thrown.getMessage.contains("NotSerializableException") || 
-      thrown.getCause.getClass === classOf[NotSerializableException])
+    assert(thrown.getMessage.contains("serializable") ||
+      thrown.getCause.getClass === classOf[NotSerializableException],
+      "Exception does not contain \"serializable\": " + thrown.getMessage)
 
     FailureSuiteState.clear()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c8d1232/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index c4f2f7e..237e644 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -240,7 +240,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext
{
     }
 
     assert(thrown.getClass === classOf[SparkException])
-    assert(thrown.getMessage.contains("NotSerializableException"))
+    assert(thrown.getMessage.toLowerCase.contains("serializable"))
   }
 }
 


Mime
View raw message