spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Riccardo Vincelli (JIRA)" <>
Subject [jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
Date Thu, 18 Jan 2018 14:20:00 GMT


Riccardo Vincelli commented on SPARK-19280:

What is the exception you are getting? For me it is:

This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations;
for example, => rdd2.values.count() * x) is invalid because the values transformation
and count action cannot be performed inside of the transformation. For more information,
see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference
to an RDD not defined by the streaming job is used in DStream operations. For more information,
See SPARK-13758.

Not sure yet if 100% related.


> Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming
> ----------------------------------------------------------------------------------------------
>                 Key: SPARK-19280
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>            Reporter: Nan Zhu
>            Priority: Critical
> In one of our applications, we found the following issue, the application recovering
from a checkpoint file named "checkpoint-***166700000" but with the timestamp ***166500000
will recover from the very beginning of the stream and because our application relies on the
external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just
> We identified a potential issue in Spark Streaming checkpoint and will describe it with
the following example. We will propose a fix in the end of this JIRA.
> 1. The application properties: Batch Duration: 20000, Functionality: Single Stream calling
ReduceByKeyAndWindow and print, Window Size: 60000, SlideDuration, 20000
> 2. RDD at 166500000 is generated and the corresponding job is submitted to the execution
ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator
> 3. Job at 166500000 is finished and JobCompleted message is sent to JobScheduler's queue,
and meanwhile, Job at 166520000 is submitted to the execution ThreadPool and similarly, a
DoCheckpoint is sent to the queue of JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it)
is not scheduled by the operating system for a long time, and during this period, Jobs generated
from 166520000 - 166700000 are generated and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed
all DoCheckpoint messages for jobs ranging from 166520000 - 166700000 and checkpoint files
are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 166700000.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging
from 166520000 - 166700000. CRITICAL: a ClearMetadata message is pushed to JobGenerator's
message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled
to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window.
In our case, ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of
ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- (166600000, 166700000]
are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY CRITICAL:
at this step, RDD no later than 166600000 has been removed, and checkpoint data is updated
> 9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with the timestamp
166500000. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-166700000 and try to get RDD with validTime
166500000. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow,
it needs to recursively slice RDDs until to the start of the stream. When the stream depends
on the external data, it will not successfully recover. In the case of Kafka, the recovered
RDDs would not be the same as the original one, as the currentOffsets has been updated to
the value at the moment of 166700000
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead
of using the timestamp of Checkpoint instance (any side-effect?)
> 1. ClearMetadata shall be ClearMedataAndCheckpoint 
> 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary
to have two threads here

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message