spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From steveloughran <...@git.apache.org>
Subject [GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Date Tue, 06 Feb 2018 21:31:19 GMT
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166448459
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    +        val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
    +        if (commitAuthorized) {
    +          logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized
to commit.")
    +          dataWriter.commit()
    +
    +        } else {
    +          val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize
commit"
    +          logInfo(message)
    +          // throwing CommitDeniedException will trigger the catch block for abort
    +          throw new CommitDeniedException(message, stageId, partId, attemptId)
    +        }
    +
    +      } else {
    +        logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    +        dataWriter.commit()
    +      }
    +
    +      logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.")
    --- End diff --
    
    It's implicitly done in the logs anyway, but I've found tracking the duration of these
operations useful


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message