spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From harishreedharan <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...
Date Tue, 17 Nov 2015 18:38:02 GMT
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9707#discussion_r45099938
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
    @@ -187,16 +187,27 @@ class CheckpointWriter(
       private var stopped = false
       private var fs_ : FileSystem = _
     
    +  @volatile private var latestCheckpointTime: Time = null
    +
       class CheckpointWriteHandler(
           checkpointTime: Time,
           bytes: Array[Byte],
           clearCheckpointDataLater: Boolean) extends Runnable {
         def run() {
    +      if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
    +        latestCheckpointTime = checkpointTime
    +      }
           var attempts = 0
           val startTime = System.currentTimeMillis()
           val tempFile = new Path(checkpointDir, "temp")
    -      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)
    -      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)
    +      // We will do checkpoint when generating a batch and completing a batch. When the
processing
    +      // time of a batch is greater than the batch interval, checkpointing for completing
an old
    +      // batch may run after checkpointing of a new batch. If this happens, checkpoint
of an old
    +      // batch actually has the latest information, so we want to recovery from it. Therefore,
we
    +      // also use the latest checkpoint time as the file name, so that we can recovery
from the
    +      // latest checkpoint file.
    +      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime)
    --- End diff --
    
    I don't think you get what I am saying. I am saying that two threads could run at the
same time writing out data to the exact same files. 
    
    If I am not mistaken, there is a bug here that could lead to 2 checkpoints running at
the same time, writing to the same files.
    -- Checkpoint 1: Completion of Batch Time t
    -- Checkpoint 2: Start of Batch Time t+1
    
    Checkpoint 2 starts -> `latestCheckpoint = t + 1`
    Checkpoint 1 starts -> since `latestCheckpoint != null` and `latestCheckpoint >
checkpointTime`, we would not reset `latestCheckpoint`, so both checkpoints would use the
same file name to write their checkpoints out.
    
    Because of this, depending on which thread reaches the tempFile creation first, that would
win - which is non-deterministic. The other thread would end up hitting an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message