spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator
Date Wed, 16 Sep 2015 00:11:32 GMT
Repository: spark
Updated Branches:
  refs/heads/master 99ecfa594 -> 38700ea40


[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator

When speculative execution is enabled, consider a scenario where the authorized committer
of a particular output partition fails during the OutputCommitter.commitTask() call. In this
case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on
committing once that task fails. However, due to a unit mismatch (we used task attempt number
in one place and task attempt id in another) the lock will not be released, causing Spark
to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end
tests (the current tests use many mocks). Other factors contributing to this bug are the fact
that we have many similarly-named identifiers that have different semantics but the same data
types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes
them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task attempt numbers
throughout this code.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #8544 from JoshRosen/SPARK-10381.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38700ea4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38700ea4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38700ea4

Branch: refs/heads/master
Commit: 38700ea40cb1dd0805cc926a9e629f93c99527ad
Parents: 99ecfa5
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Tue Sep 15 17:11:21 2015 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Tue Sep 15 17:11:21 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |  3 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |  7 +-
 .../spark/executor/CommitDeniedException.scala  |  4 +-
 .../spark/mapred/SparkHadoopMapRedUtil.scala    | 20 ++----
 .../apache/spark/scheduler/DAGScheduler.scala   |  7 +-
 .../scheduler/OutputCommitCoordinator.scala     | 48 +++++++-------
 .../org/apache/spark/scheduler/TaskInfo.scala   |  7 +-
 .../spark/status/api/v1/AllStagesResource.scala |  2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  4 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 ...utputCommitCoordinatorIntegrationSuite.scala | 68 ++++++++++++++++++++
 .../OutputCommitCoordinatorSuite.scala          | 24 ++++---
 .../apache/spark/util/JsonProtocolSuite.scala   |  2 +-
 project/MimaExcludes.scala                      | 36 ++++++++++-
 .../execution/datasources/WriterContainer.scala |  3 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  4 +-
 .../spark/sql/hive/hiveWriterContainers.scala   |  2 +-
 17 files changed, 174 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index ae5926d..ac6eaab 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -104,8 +104,7 @@ class SparkHadoopWriter(jobConf: JobConf)
   }
 
   def commit() {
-    SparkHadoopMapRedUtil.commitTask(
-      getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
+    SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
   }
 
   def commitJob() {

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 2ae878b..7137246 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -193,9 +193,12 @@ case object TaskKilled extends TaskFailedReason {
  * Task requested the driver to commit, but was denied.
  */
 @DeveloperApi
-case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason
{
+case class TaskCommitDenied(
+    jobID: Int,
+    partitionID: Int,
+    attemptNumber: Int) extends TaskFailedReason {
   override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
-    s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+    s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
   /**
    * If a task failed because its attempt to commit was denied, do not count this failure
    * towards failing the stage. This is intended to prevent spurious stage failures in cases

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index f47d7ef..7d84889 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
     msg: String,
     jobID: Int,
     splitID: Int,
-    attemptID: Int)
+    attemptNumber: Int)
   extends Exception(msg) {
 
-  def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
+  def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f405b73..f7298e8 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
       committer: MapReduceOutputCommitter,
       mrTaskContext: MapReduceTaskAttemptContext,
       jobId: Int,
-      splitId: Int,
-      attemptId: Int): Unit = {
+      splitId: Int): Unit = {
 
     val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
 
@@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {
 
       if (shouldCoordinateWithDriver) {
         val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
-        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
+        val taskAttemptNumber = TaskContext.get().attemptNumber()
+        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
 
         if (canCommit) {
           performCommit()
@@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
           logInfo(message)
           // We need to abort the task so that the driver can reschedule new attempts, if
necessary
           committer.abortTask(mrTaskContext)
-          throw new CommitDeniedException(message, jobId, splitId, attemptId)
+          throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
         }
       } else {
         // Speculation is disabled or a user has chosen to manually bypass the commit coordination
@@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
       logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
     }
   }
-
-  def commitTask(
-      committer: MapReduceOutputCommitter,
-      mrTaskContext: MapReduceTaskAttemptContext,
-      sparkTaskContext: TaskContext): Unit = {
-    commitTask(
-      committer,
-      mrTaskContext,
-      sparkTaskContext.stageId(),
-      sparkTaskContext.partitionId(),
-      sparkTaskContext.attemptNumber())
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b4f90e8..3c9a66e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1128,8 +1128,11 @@ class DAGScheduler(
     val stageId = task.stageId
     val taskType = Utils.getFormattedClassName(task)
 
-    outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
-      event.taskInfo.attempt, event.reason)
+    outputCommitCoordinator.taskCompleted(
+      stageId,
+      task.partitionId,
+      event.taskInfo.attemptNumber, // this is a task attempt number
+      event.reason)
 
     // The success case is dealt with separately below, since we need to compute accumulator
     // updates before posting.

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 5d92637..add0ded 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
 private sealed trait OutputCommitCoordinationMessage extends Serializable
 
 private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
+private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber:
Int)
 
 /**
  * Authority that decides whether tasks can commit output to HDFS. Uses a "first committer
wins"
@@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
   var coordinatorRef: Option[RpcEndpointRef] = None
 
   private type StageId = Int
-  private type PartitionId = Long
-  private type TaskAttemptId = Long
+  private type PartitionId = Int
+  private type TaskAttemptNumber = Int
 
   /**
    * Map from active stages's id => partition id => task attempt with exclusive lock
on committing
@@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
    * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator
instance.
    */
   private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
-  private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
+  private type CommittersByStageMap =
+    mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
 
   /**
    * Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
    *
    * @param stage the stage number
    * @param partition the partition number
-   * @param attempt a unique identifier for this task attempt
+   * @param attemptNumber how many times this task has been attempted
+   *                      (see [[TaskContext.attemptNumber()]])
    * @return true if this task is authorized to commit, false otherwise
    */
   def canCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = {
-    val msg = AskPermissionToCommitOutput(stage, partition, attempt)
+      attemptNumber: TaskAttemptNumber): Boolean = {
+    val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
     coordinatorRef match {
       case Some(endpointRef) =>
         endpointRef.askWithRetry[Boolean](msg)
@@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
 
   // Called by DAGScheduler
   private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
-    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
+    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
   }
 
   // Called by DAGScheduler
@@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
   private[scheduler] def taskCompleted(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId,
+      attemptNumber: TaskAttemptNumber,
       reason: TaskEndReason): Unit = synchronized {
     val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
       logDebug(s"Ignoring task completion for completed stage")
@@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
       case Success =>
       // The task output has been committed successfully
       case denied: TaskCommitDenied =>
-        logInfo(
-          s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
+        logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
+          s"attempt: $attemptNumber")
       case otherReason =>
-        if (authorizedCommitters.get(partition).exists(_ == attempt)) {
-          logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;"
+
-            s" clearing lock")
+        if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
+          logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, "
+
+            s"partition=$partition) failed; clearing lock")
           authorizedCommitters.remove(partition)
         }
     }
@@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
Boolean)
   private[scheduler] def handleAskPermissionToCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = synchronized {
+      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
     authorizedCommittersByStage.get(stage) match {
       case Some(authorizedCommitters) =>
         authorizedCommitters.get(partition) match {
           case Some(existingCommitter) =>
-            logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition;
" +
-              s"existingCommitter = $existingCommitter")
+            logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, "
+
+              s"partition=$partition; existingCommitter = $existingCommitter")
             false
           case None =>
-            logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
-            authorizedCommitters(partition) = attempt
+            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage,
" +
+              s"partition=$partition")
+            authorizedCommitters(partition) = attemptNumber
             true
         }
       case None =>
-        logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
+        logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber
of" +
+          s"partition $partition to commit")
         false
     }
   }
@@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-      case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
+      case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
         context.reply(
-          outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
+          outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 132a9ce..f113c2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
 class TaskInfo(
     val taskId: Long,
     val index: Int,
-    val attempt: Int,
+    val attemptNumber: Int,
     val launchTime: Long,
     val executorId: String,
     val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
     }
   }
 
-  def id: String = s"$index.$attempt"
+  @deprecated("Use attemptNumber", "1.6.0")
+  def attempt: Int = attemptNumber
+
+  def id: String = s"$index.$attemptNumber"
 
   def duration: Long = {
     if (!finished) {

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 390c136..24a0b52 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
     new TaskData(
       taskId = uiData.taskInfo.taskId,
       index = uiData.taskInfo.index,
-      attempt = uiData.taskInfo.attempt,
+      attempt = uiData.taskInfo.attemptNumber,
       launchTime = new Date(uiData.taskInfo.launchTime),
       executorId = uiData.taskInfo.executorId,
       host = uiData.taskInfo.host,

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 2b71f55..712782d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -621,7 +621,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage")
{
           serializationTimeProportionPos + serializationTimeProportion
 
         val index = taskInfo.index
-        val attempt = taskInfo.attempt
+        val attempt = taskInfo.attemptNumber
 
         val svgTag =
           if (totalExecutionTime == 0) {
@@ -967,7 +967,7 @@ private[ui] class TaskDataSource(
     new TaskTableRowData(
       info.index,
       info.taskId,
-      info.attempt,
+      info.attemptNumber,
       info.speculative,
       info.status,
       info.taskLocality.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 24f7874..99614a7 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -266,7 +266,7 @@ private[spark] object JsonProtocol {
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
     ("Task ID" -> taskInfo.taskId) ~
     ("Index" -> taskInfo.index) ~
-    ("Attempt" -> taskInfo.attempt) ~
+    ("Attempt" -> taskInfo.attemptNumber) ~
     ("Launch Time" -> taskInfo.launchTime) ~
     ("Executor ID" -> taskInfo.executorId) ~
     ("Host" -> taskInfo.host) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
new file mode 100644
index 0000000..1ae5b03
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.{Span, Seconds}
+
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
+import org.apache.spark.util.Utils
+
+/**
+ * Integration tests for the OutputCommitCoordinator.
+ *
+ * See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
+ */
+class OutputCommitCoordinatorIntegrationSuite
+  extends SparkFunSuite
+  with LocalSparkContext
+  with Timeouts {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val conf = new SparkConf()
+      .set("master", "local[2,4]")
+      .set("spark.speculation", "true")
+      .set("spark.hadoop.mapred.output.committer.class",
+        classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
+    sc = new SparkContext("local[2, 4]", "test", conf)
+  }
+
+  test("exception thrown in OutputCommitter.commitTask()") {
+    // Regression test for SPARK-10381
+    failAfter(Span(60, Seconds)) {
+      val tempDir = Utils.createTempDir()
+      try {
+        sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath
+ "/out")
+      } finally {
+        Utils.deleteRecursively(tempDir)
+      }
+    }
+  }
+}
+
+private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
+  override def commitTask(context: TaskAttemptContext): Unit = {
+    val ctx = TaskContext.get()
+    if (ctx.attemptNumber < 1) {
+      throw new java.io.FileNotFoundException("Intentional exception")
+    }
+    super.commitTask(context)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index e5ecd4b..6d08d7c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -63,6 +63,9 @@ import scala.language.postfixOps
  * was not in SparkHadoopWriter, the tests would still pass because only one of the
  * increments would be captured even though the commit in both tasks was executed
  * erroneously.
+ *
+ * See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
+ * not use mocks.
  */
 class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter
{
 
   test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)")
{
     val stage: Int = 1
-    val partition: Long = 2
-    val authorizedCommitter: Long = 3
-    val nonAuthorizedCommitter: Long = 100
+    val partition: Int = 2
+    val authorizedCommitter: Int = 3
+    val nonAuthorizedCommitter: Int = 100
     outputCommitCoordinator.stageStart(stage)
-    assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
-    assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+
+    assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
+    assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
     // The non-authorized committer fails
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
     // New tasks should still not be able to commit because the authorized committer has
not failed
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter
+ 1))
+      !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
     // A new task should now be allowed to become the authorized committer
     assert(
-      outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter
+ 2))
+      outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
     // There can only be one authorized committer
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter
+ 3))
+      !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 47e548e..143c1b9 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -499,7 +499,7 @@ class JsonProtocolSuite extends SparkFunSuite {
   private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
     assert(info1.taskId === info2.taskId)
     assert(info1.index === info2.index)
-    assert(info1.attempt === info2.attempt)
+    assert(info1.attemptNumber === info2.attemptNumber)
     assert(info1.launchTime === info2.launchTime)
     assert(info1.executorId === info2.executorId)
     assert(info1.host === info2.host)

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 46026c1..1c96b09 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -45,7 +45,7 @@ object MimaExcludes {
         excludePackage("org.apache.spark.sql.execution")
       ) ++
       MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
-      MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++ 
+      MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++
       Seq(
         ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.ml.classification.LogisticCostFun.this"),
@@ -53,6 +53,23 @@ object MimaExcludes {
           "org.apache.spark.ml.classification.LogisticAggregator.add"),
         ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.ml.classification.LogisticAggregator.count")
+      ) ++ Seq(
+        // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
+        // This class is marked as `private` but MiMa still seems to be confused by the change.
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
       )
     case v if v.startsWith("1.5") =>
       Seq(
@@ -213,6 +230,23 @@ object MimaExcludes {
         // SPARK-9704 Made ProbabilisticClassifier, Identifiable, VectorUDT public APIs
         ProblemFilters.exclude[IncompatibleResultTypeProblem](
           "org.apache.spark.mllib.linalg.VectorUDT.serialize")
+      ) ++ Seq(
+        // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
+        // This class is marked as `private` but MiMa still seems to be confused by the change.
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem](
+          "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
       )
 
     case v if v.startsWith("1.4") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index f8ef674..cfd64c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -198,8 +198,7 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(
-      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+    SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId)
   }
 
   def abortTask(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2bbb41c..7a46c69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -54,9 +54,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     details = ""
   )
 
-  private def createTaskInfo(taskId: Int, attempt: Int): TaskInfo = new TaskInfo(
+  private def createTaskInfo(taskId: Int, attemptNumber: Int): TaskInfo = new TaskInfo(
     taskId = taskId,
-    attempt = attempt,
+    attemptNumber = attemptNumber,
     // The following fields are not used in tests
     index = 0,
     launchTime = 0,

http://git-wip-us.apache.org/repos/asf/spark/blob/38700ea4/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 4ca8042..c8d6b71 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -121,7 +121,7 @@ private[hive] class SparkHiveWriterContainer(
   }
 
   protected def commit() {
-    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
+    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
   }
 
   private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message