spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: correct and fast way to stop streaming application
Date Tue, 27 Oct 2015 15:30:49 GMT
If you want to make sure that your offsets are increasing without gaps...
one way to do that is to enforce that invariant when you're saving to your
database.  That would probably mean using a real database instead of
zookeeper though.

On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav <krot.vyacheslav@gmail.com>
wrote:

> Any ideas? This is so important because we use kafka direct streaming and
> save processed offsets manually as last step in the job, so we archive
> at-least-once.
> But see what happens when new batch is scheduled after a job fails:
> - suppose we start from offset 10 loaded from zookeeper
> - job starts with offsets 10-20
> - job fails N times, awaitTermination notices that and stops context (or
> even jvm with System.exit), but Scheduler has already started new job, it
> is job for offsets 20-30, and sent it to executor.
> - executor does all the steps (if there is only one stage) and saves
> offset 30 to zookeeper.
>
> This way I loose data in offsets 10-20
>
> How should this be handled correctly?
>
> пн, 26 окт. 2015 г. в 18:37, varun sharma <varunsharmansit@gmail.com>:
>
>> +1, wanted to do same.
>>
>> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav <
>> krot.vyacheslav@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I wonder what is the correct way to stop streaming application if some
>>> job failed?
>>> What I have now:
>>>
>>> val ssc = new StreamingContext
>>> ....
>>> ssc.start()
>>> try {
>>>    ssc.awaitTermination()
>>> } catch {
>>>    case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
>>> }
>>>
>>> It works but one problem still exists - after job failed and before
>>> streaming context is stopped it manages to start job for next batch. That
>>> is not desirable for me.
>>> It works like this because JobScheduler is an actor and after it reports
>>> error, it goes on with next message that starts next batch job. While
>>> ssc.awaitTermination() works in another thread and happens after next batch
>>> starts.
>>>
>>> Is there a way to stop before next job is submitted?
>>>
>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>

Mime
View raw message