spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nan Zhu (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-19278) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
Date Wed, 18 Jan 2017 19:16:26 GMT
Nan Zhu created SPARK-19278:
-------------------------------

             Summary: Failed Recovery from checkpoint caused by the multi-threads issue in
Spark Streaming scheduler
                 Key: SPARK-19278
                 URL: https://issues.apache.org/jira/browse/SPARK-19278
             Project: Spark
          Issue Type: Bug
    Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3
            Reporter: Nan Zhu


In one of our applications, we found the following issue, the application recovering from
a checkpoint file named "checkpoint-***166700000" but with the timestamp ***166500000 will
recover from the very beginning of the stream and because our application relies on the external
& periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe it with the
following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 20000, Functionality: Single Stream calling
ReduceByKeyAndWindow and print, Window Size: 60000, SlideDuration, 20000

2. RDD at 166500000 is generated and the corresponding job is submitted to the execution ThreadPool.
Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator

3. Job at 166500000 is finished and JobCompleted message is sent to JobScheduler's queue,
and meanwhile, Job at 166520000 is submitted to the execution ThreadPool and similarly, a
DoCheckpoint is sent to the queue of JobGenerator

4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not
scheduled by the operating system for a long time, and during this period, Jobs generated
from 166520000 - 166700000 are generated and completed.

5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed
all DoCheckpoint messages for jobs ranging from 166520000 - 166700000 and checkpoint files
are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 166700000.

6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from
166520000 - 166700000. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message
queue for EACH JobCompleted.

7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled
to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window.
In our case, ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of
ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- (166600000, 166700000]
are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread

8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY CRITICAL: at
this step, RDD no later than 166700000 has been removed, and checkpoint data is updated as
 https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.

9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with the timestamp
166500000. and at this moment, Application crashed

10. Application recovers from /path/checkpoint-166700000 and try to get RDD with validTime
166500000. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow,
it needs to recursively slice RDDs until to the start of the stream. When the stream depends
on the external data, it will not successfully recover. In the case of Kafka, the recovered
RDDs would not be the same as the original one, as the currentOffsets has been updated to
the value at the moment of 166700000


The proposed fix:

0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using
the timestamp of Checkpoint instance (any side-effect?)
1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary
to have two threads here




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message