spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andrei Taleanu (JIRA)" <>
Subject [jira] [Commented] (SPARK-20323) Calling stop in a transform stage causes the app to hang
Date Fri, 14 Apr 2017 12:40:41 GMT


Andrei Taleanu commented on SPARK-20323:

[~srowen] I see. Let me describe you better the problem. Short version: I have a *streaming
job*. Although a *batch processing fails the processing continues* if I let Spark alone handle
the thrown exceptions. This translates to data loss and loss of at-least once semantics.

Detailed version: I started originally from an app we run on Spark 2.1.0 on top of Mesos w/
Hadoop 2.6, checkpointing disabled (it's done "manually" as you'll see below). I tried narrowing
it down as much as possible to reproduce a similar issue in the local mode, just for illustration
purposes (that's where the code I put in the issue description came). Consider the following
1) read data from a Kafka source
2) transform the dstream:
  a) get data from an external service to avoid too many calls from executors (might fail)
  b) broadcast the data
  c) map the RDD using the broadcast value
3) cache the transformed dstream
4) foreach RDD write cached data into a db (might fail)
5) foreach RDD:
  a) write cached data in Kafka (might fail)
  b) manually commit the new Kafka offsets (because I need a human readable format)

There are multiple points of failure here (e.g. 2.a) and what I need is failing asap (see
5.b which means data loss if anything prior to that one failed in a micro-batch processing).
Obviously manipulating the context in transform is wrong. Obviously doing this in foreachRDD
in the same thread is again wrong (as recommended by [~zsxwing] in SPARK-20321).

What's the recommended way to handle this? If I just let Spark alone handle exceptions it
seems to somehow ignore them (2.a case for example) and continue processing. Since this means
data loss I need to avoid it (I need at-least once guarantees).

Thanks again :)

> Calling stop in a transform stage causes the app to hang
> --------------------------------------------------------
>                 Key: SPARK-20323
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Andrei Taleanu
> I'm not sure if this is a bug or just the way it needs to happen but I've run in this
issue with the following code:
> {noformat}
> object ImmortalStreamingJob extends App {
>   val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]")
>   val ssc  = new StreamingContext(conf, Seconds(1))
>   val elems = (1 to 1000).grouped(10)
>     .map(seq => ssc.sparkContext.parallelize(seq))
>     .toSeq
>   val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*))
>   val transformed = stream.transform { rdd =>
>     try {
>       if (Random.nextInt(6) == 5) throw new RuntimeException("boom")
>       else println("lucky bastard")
>       rdd
>     } catch {
>       case e: Throwable =>
>         println("stopping streaming context", e)
>         ssc.stop(stopSparkContext = true, stopGracefully = false)
>         throw e
>     }
>   }
>   transformed.foreachRDD { rdd =>
>     println(rdd.collect().mkString(","))
>   }
>   ssc.start()
>   ssc.awaitTermination()
> }
> {noformat}
> There are two things I can note here:
> * if the exception is thrown in the first transformation (when the first RDD is processed),
the spark context is stopped and the app dies
> * if the exception is thrown after at least one RDD has been processed, the app hangs
after printing the error message and never stops
> I think there's some sort of deadlock in the second case, is that normal? I also asked
this [here|]
but up two this point there's no answer pointing exactly to what happens, only guidelines.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message