spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Krot Viacheslav <krot.vyaches...@gmail.com>
Subject Re: correct and fast way to stop streaming application
Date Tue, 27 Oct 2015 17:47:43 GMT
Actually a great idea, I even didn't think about that. Thanks a lot!

вт, 27 окт. 2015 г. в 17:29, varun sharma <varunsharmansit@gmail.com>:

> One more thing we can try is before committing offset we can verify the
> latest offset of that partition(in zookeeper) with fromOffset in
> OffsetRange.
> Just a thought...
>
> Let me know if it works..
>
> On Tue, Oct 27, 2015 at 9:00 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
>
>> If you want to make sure that your offsets are increasing without gaps...
>> one way to do that is to enforce that invariant when you're saving to your
>> database.  That would probably mean using a real database instead of
>> zookeeper though.
>>
>> On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav <
>> krot.vyacheslav@gmail.com> wrote:
>>
>>> Any ideas? This is so important because we use kafka direct streaming
>>> and save processed offsets manually as last step in the job, so we archive
>>> at-least-once.
>>> But see what happens when new batch is scheduled after a job fails:
>>> - suppose we start from offset 10 loaded from zookeeper
>>> - job starts with offsets 10-20
>>> - job fails N times, awaitTermination notices that and stops context (or
>>> even jvm with System.exit), but Scheduler has already started new job, it
>>> is job for offsets 20-30, and sent it to executor.
>>> - executor does all the steps (if there is only one stage) and saves
>>> offset 30 to zookeeper.
>>>
>>> This way I loose data in offsets 10-20
>>>
>>> How should this be handled correctly?
>>>
>>> пн, 26 окт. 2015 г. в 18:37, varun sharma <varunsharmansit@gmail.com>:
>>>
>>>> +1, wanted to do same.
>>>>
>>>> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav <
>>>> krot.vyacheslav@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I wonder what is the correct way to stop streaming application if some
>>>>> job failed?
>>>>> What I have now:
>>>>>
>>>>> val ssc = new StreamingContext
>>>>> ....
>>>>> ssc.start()
>>>>> try {
>>>>>    ssc.awaitTermination()
>>>>> } catch {
>>>>>    case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
>>>>> }
>>>>>
>>>>> It works but one problem still exists - after job failed and before
>>>>> streaming context is stopped it manages to start job for next batch.
That
>>>>> is not desirable for me.
>>>>> It works like this because JobScheduler is an actor and after it
>>>>> reports error, it goes on with next message that starts next batch job.
>>>>> While ssc.awaitTermination() works in another thread and happens after
next
>>>>> batch starts.
>>>>>
>>>>> Is there a way to stop before next job is submitted?
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> *VARUN SHARMA*
>>>> *Flipkart*
>>>> *Bangalore*
>>>>
>>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>

Mime
View raw message