spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Graceful shutdown drops processing in Spark Streaming
Date Thu, 08 Oct 2015 01:24:46 GMT
Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally
the streaming scheduler waits for the last "batch" interval which has data
to be processed, but if there is a sliding interval (i.e. 15 mins) that is
higher than batch interval, then that might not be run. This is indeed a
bug and should be fixed. Mind setting up a JIRA and assigning it to me.

On Wed, Oct 7, 2015 at 8:33 AM, Michal ńĆizmazia <micizma@gmail.com> wrote:

> After triggering the graceful shutdown on the following application, the
> application stops before the windowed stream reaches its slide duration. As
> a result, the data is not completely processed (i.e. saveToMyStorage is not
> called) before shutdown.
>
> According to the documentation, graceful shutdown should ensure that the
> data, which has been received, is completely processed before shutdown.
>
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>
> Spark version: 1.4.1
>
> Code snippet:
>
> Function0<JavaStreamingContext> factory = () -> {
>     JavaStreamingContext context = new JavaStreamingContext(sparkConf,
> Durations.minutes(1));
>     context.checkpoint("/test");
>     JavaDStream<String> records =
> context.receiverStream(myReliableReceiver).flatMap(...);
>     records.persist(StorageLevel.MEMORY_AND_DISK());
>     records.foreachRDD(rdd -> { rdd.count(); return null; });
>     records
>         .window(Durations.minutes(15), Durations.minutes(15))
>         .foreachRDD(rdd -> saveToMyStorage(rdd));
>     return context;
> };
>
> try (JavaStreamingContext context =
> JavaStreamingContext.getOrCreate("/test", factory)) {
>     context.start();
>     waitForShutdownSignal();
>     Boolean stopSparkContext = true;
>     Boolean stopGracefully = true;
>     context.stop(stopSparkContext, stopGracefully);
> }
>
>

Mime
View raw message