spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stavros Kontopoulos (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-16480) Streaming checkpointing does not work well with SIGTERM
Date Mon, 11 Jul 2016 13:02:11 GMT

    [ https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370644#comment-15370644
] 

Stavros Kontopoulos edited comment on SPARK-16480 at 7/11/16 1:01 PM:
----------------------------------------------------------------------

[~srowen] Ok format fixed
SIGINT causes the same issue and code is pretty valid does not do any nested RDD calls.
SIGTERM should run the shutdown hooks.
What is the proper documented method to stop a streaming job?
Zero time is: 1468234787000 which is i guess Mon, 11 Jul 2016 12:58:03 GMT
https://github.com/apache/spark/blob/v1.6.1/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L324
I guess it will fail if checkpointed data time equals zerotime.



was (Author: skonto):
[~srowen] Ok format fixed
SIGINT causes the same issue and code is pretty valid does not do any nested RDD calls.
SIGTERM should run the shutdown hooks.
What is the proper documented method to stop a streaming job?

> Streaming checkpointing does not work well with SIGTERM
> -------------------------------------------------------
>
>                 Key: SPARK-16480
>                 URL: https://issues.apache.org/jira/browse/SPARK-16480
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.1
>            Reporter: Stavros Kontopoulos
>
> A customer gets the following exception when tries to stop gracefully a streaming job
with SIGTERM:
> {quote}
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked
by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count()
* x) is invalid because the values transformation and count action cannot be performed inside
of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> {quote}
> This obviously implies invalid state for checkpointing data. This can be reproduced easily
with the following code skeleton (Kafka direct stream is used):
> {quote}
> val dataStream = myKafkaDirectStream.
>   .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>
>   // back up the state
>   data.cache()
>   data.collect().foreach(println)
>   data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
> ..
> }
> {quote}
> inputPath above is both the initialRdd and the checkpointing dir (using a different path
does not affect the issue).
> Streaming context is created correctly with getOrCreate and all transformations and actions
are put in 
> that function.
> In order to reproduce you just run the job (1st run), stop it with kill -15 ... and then
restart it (2nd run).
> To reproduce the issue you can use an empty local folder for the "inputPath" the checkpointing
path.
> The checkpointing blocks from the first run:
> {quote}
> 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for time 1468234791000
ms: [
> 4 checkpoint files 
> 1468234791000 ms -> [Lscala.Tuple4;@60a55c45
> 1468234790000 ms -> [Lscala.Tuple4;@58e5be3
> 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
> 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
> ]
> 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data for time
1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 1468234791000
ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 1468234791000
ms: [
> 0 checkpoint files 
> ]
> {quote}
> In the first run i successfully see: 
> {quote}
> 16/07/11 13:59:49 INFO StreamingContext: Invoking stop(stopGracefully=true) from shutdown
hook
> {quote}
> Here is the log output from the second run, when the recovery occurs from the checkpointing
data:
> {quote}
> 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, EOF reached
> java.io.EOFException
> 	at java.io.DataInputStream.readInt(DataInputStream.java:392)
> 	at org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
> 	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
> 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 1468234791000
ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000
ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000
ms, 1468234802000 ms
> 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
> 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 1468234791000
ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000
ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000
ms, 1468234802000 ms
> 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
> 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is valid
> 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is invalid
as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and difference is 0 ms
> 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked
by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count()
* x) is invalid because the values transformation and count action cannot be performed inside
of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> 	at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
> {quote}
> Maybe the first exception in the 2nd run relates to:
> https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=uPFgfXg@mail.gmail.com%3E
> and causes the issue (not sure what is happening there).
> The issue was reported to happen also when using HDFS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message