Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally
the streaming scheduler waits for the last "batch" interval which has data
to be processed, but if there is a sliding interval (i.e. 15 mins) that is
higher than batch interval, then that might not be run. This is indeed a
bug and should be fixed. Mind setting up a JIRA and assigning it to me.
On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia <micizma@gmail.com> wrote:
> After triggering the graceful shutdown on the following application, the
> application stops before the windowed stream reaches its slide duration. As
> a result, the data is not completely processed (i.e. saveToMyStorage is not
> called) before shutdown.
>
> According to the documentation, graceful shutdown should ensure that the
> data, which has been received, is completely processed before shutdown.
>
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>
> Spark version: 1.4.1
>
> Code snippet:
>
> Function0<JavaStreamingContext> factory = () -> {
> JavaStreamingContext context = new JavaStreamingContext(sparkConf,
> Durations.minutes(1));
> context.checkpoint("/test");
> JavaDStream<String> records =
> context.receiverStream(myReliableReceiver).flatMap(...);
> records.persist(StorageLevel.MEMORY_AND_DISK());
> records.foreachRDD(rdd -> { rdd.count(); return null; });
> records
> .window(Durations.minutes(15), Durations.minutes(15))
> .foreachRDD(rdd -> saveToMyStorage(rdd));
> return context;
> };
>
> try (JavaStreamingContext context =
> JavaStreamingContext.getOrCreate("/test", factory)) {
> context.start();
> waitForShutdownSignal();
> Boolean stopSparkContext = true;
> Boolean stopGracefully = true;
> context.stop(stopSparkContext, stopGracefully);
> }
>
>
|