spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5233) Error replay of WAL when recovered from driver failue
Date Wed, 14 Jan 2015 05:45:34 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276513#comment-14276513
] 

Apache Spark commented on SPARK-5233:
-------------------------------------

User 'jerryshao' has created a pull request for this issue:
https://github.com/apache/spark/pull/4032

> Error replay of WAL when recovered from driver failue
> -----------------------------------------------------
>
>                 Key: SPARK-5233
>                 URL: https://issues.apache.org/jira/browse/SPARK-5233
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Saisai Shao
>
> Spark Streaming will write all the event into WAL for driver recovery, the sequence in
the WAL may be like this:
> {code}
> BlockAdditionEvent ---> BlockAdditionEvent ---> BlockAdditionEvent ---> BatchAllocationEvent
---> BatchCleanupEvent ---> BlockAdditionEvent ---> BlockAdditionEvent ---> 'Driver
Down Time' ---> BlockAdditionEvent ---> BlockAdditionEvent ---> BatchAllocationEvent
> {code}
> When driver recovered from failure, it will replay all the existed metadata WAL to get
the right status, in this situation, two BatchAdditionEvent before down will put into received
block queue. After driver started, new incoming blocking will also put into this queue and
a follow-up BlockAllocationEvent will allocate an allocatedBlocks with queue draining out.
So old, not this batch's data will also mix into this batch, here is the partial log:
> {code}
> 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>block store
result for batch 1421140750000 ms
> ....
> 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
      53201,46704,480)
> 197757 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
      53201,47188,480)
> 197758 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
      53201,47672,480)
> 197759 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
      53201,48156,480)                                                                   
              
> 197760 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
      53201,48640,480)
> 197761 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
      53201,49124,480)
> 197762 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
      07074,0,44184)
> 197763 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
      07074,44188,58536)
> 197764 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
      07074,102728,60168)
> 197765 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
      07074,162900,64584)
> 197766 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log
segment: WriteAheadLogFileSegment(file:       /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
      07074,227488,51240)
> {code}
> The old log "/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406"
is obviously far older than current batch interval, and will fetch again to add to process.
> This issue is subtle, because in the previous code we never delete the old received data
WAL. This will lead to unwanted result as I know.
> Basically because we miss some BlockAllocationEvent when recovered from failure. I think
we need to correctly replay and insert all the events correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message