spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JoshRosen <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-4879] Use driver to coordinate Hadoop o...
Date Wed, 11 Feb 2015 01:29:36 GMT
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4066#discussion_r24468427
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import scala.collection.mutable
    +
    +import akka.actor.{ActorRef, Actor}
    +
    +import org.apache.spark._
    +import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
    +
    +private sealed trait OutputCommitCoordinationMessage extends Serializable
    +
    +private case object StopCoordinator extends OutputCommitCoordinationMessage
    +private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
    +
    +/**
    + * Authority that decides whether tasks can commit output to HDFS.
    + *
    + * This lives on the driver, but the actor allows the tasks that commit to Hadoop to
invoke it.
    + *
    + * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull
requests)
    + * for an extensive design discussion.
    + */
    +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
    +
    +  // Initialized by SparkEnv
    +  var coordinatorActor: Option[ActorRef] = None
    +  private val timeout = AkkaUtils.askTimeout(conf)
    +  private val maxAttempts = AkkaUtils.numRetries(conf)
    +  private val retryInterval = AkkaUtils.retryWaitMs(conf)
    +
    +  private type StageId = Int
    +  private type TaskId = Long
    +  private type TaskAttemptId = Long
    +  private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]]
    +
    +  // Access to this state should be guarded by synchronizing on the instance.
    +  private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
    +
    +  /**
    +   * Called by tasks to ask whether they can commit their output to HDFS.
    +   *
    +   * If a task attempt has been authorized to commit, then all other attempts to commit
the same
    +   * task will be denied.  If the authorized task attempt fails (e.g. due to its executor
being
    +   * lost), then a subsequent task attempt may be authorized to commit its output.
    +   *
    +   * @param stage the stage number
    +   * @param task the task number
    +   * @param attempt a unique identifier for this task attempt
    +   * @return true if this task is authorized to commit, false otherwise
    +   */
    +  def canCommit(
    +      stage: StageId,
    +      task: TaskId,
    +      attempt: TaskAttemptId): Boolean = {
    +    val msg = AskPermissionToCommitOutput(stage, task, attempt)
    +    coordinatorActor match {
    +      case Some(actor) =>
    +        AkkaUtils.askWithReply[Boolean](msg, actor, maxAttempts, retryInterval, timeout)
    +      case None =>
    +        logError(
    +          "canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")
    +        false
    +    }
    +  }
    +
    +  // Called by DAGScheduler
    +  private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
    +    authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]()
    +  }
    +
    +  // Called by DAGScheduler
    +  private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
    +    authorizedCommittersByStage.remove(stage)
    +  }
    +
    +  // Called by DAGScheduler
    +  private[scheduler] def taskCompleted(
    +      stage: StageId,
    +      task: TaskId,
    +      attempt: TaskAttemptId,
    +      reason: TaskEndReason): Unit = synchronized {
    +    val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
    +      logDebug(s"Ignoring task completion for completed stage")
    +      return
    +    })
    +    reason match {
    +      case Success =>
    +      // The task output has been committed successfully
    +      case denied: TaskCommitDenied =>
    +        logInfo(s"Task was denied committing, stage: $stage, taskId: $task, attempt:
$attempt")
    +      case otherReason =>
    +        logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed;"
+
    +          s" clearing lock")
    +        authorizedCommitters.remove(task)
    +    }
    +  }
    +
    +  def stop(): Unit = synchronized {
    +    coordinatorActor.foreach(_ ! StopCoordinator)
    +    coordinatorActor = None
    +    authorizedCommittersByStage.foreach(_._2.clear)
    +    authorizedCommittersByStage.clear
    --- End diff --
    
    I don't think we need to clear each sub-map, so I'll replace this with a simple `clear()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message