spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From steveloughran <>
Subject [GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Date Tue, 06 Feb 2018 21:28:06 GMT
Github user steveloughran commented on a diff in the pull request:
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    bq. We never guarantee that for an RDD partition, only one task can commit successfully
    There's at-least once though, right? And then the Job Commit (which is implicitly at-most-once)
is expected to handle the situation wherein 1+ task may have committed, and should resolve
it so that the output of only one task is added.
    One thing which I think would be good is for the spark docs to somewhere (scaladoc? markdown)
to precisely write down its requirements of a committer. For the WiP paper on the new S3A
committers, [I've tried to do this across MR & Spark](
    1. Complete: you get the output of all committed tasks
    2. Exclusive: you only get the output of committed tasks
    3. (Consistent: produces right output even if store is inconsistent)
    4. Concurrent: >1 task may commit simultaneously
    5. Abortable: if you abort a task, no output is visible
    6. Continuity of correctness: after a job is committed,  no partitioned task may suddenly
add its work to the output.
    Not required: if there's a partition and a 2nd task attempt is committed, the output of
either one of those attempts must be committed, but the specifics of which one is left open.
    * Hadoop MR v1 meets 1-6 on HDFS, fails on 3 against raw S3
    * The Direct Parquet committer fails to meet requirements (2, 5 & probably 6)
    * The Hadoop MR v2 committer fails on 2, because if a task attempt commit fails partway
through, some of its output may be in the dest dir. Both Spark and MR assume that this situation
never occurs. Really, committers should be able to say "Doesn't support retry on task commit
failure", or better. 
    Regarding this patch,
    1. how often do you actually expect people to be doing their own commit co-ordinator?

    1. What's the likelihood that they will get it right?
    As we can see, the number of people who can correctly implement a committer is <<
than those who have shipped one; I don't see a commit coordinator being any different. It's
good to offer the flexibility, but important to have the default being the one which everyone
else uses and which is generally trusted.


To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message