spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Using KafkaDirectStream, stopGracefully and exceptions
Date Thu, 10 Sep 2015 14:38:57 GMT
You have to store offsets somewhere.

If you're going to store them in checkpoints, then you have to deal with
the fact that checkpoints aren't recoverable on code change.  Starting up
the new version helps because you don't start it from the same checkpoint
directory as the running one... it has your new code, and is storing to a
new checkpoint directory.  If you started the new one from the latest
offsets, you can shut down the old one as soon as it's caught up.

If you don't like the implications of storing offsets in checkpoints...
then sure, store them yourself.  A real database would be better, but if
you really want to store them in zookeeper you can.  In any case, just do
your offset saves in the same foreachPartition your other output operations
are occurring in, after they've successfully completed.

If you don't care about performance benefits of the direct stream and don't
want exactly once semantics, sure use the old stream.

Finally, hundreds of gigs just really isn't very much data.  Unless what
you're doing is really resource intensive, it shouldn't take much time to
process it all, especially if you can dynamically size a cluster for the
rare occasion that something is screwed up and you need to reprocess.


On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki <k.zarzycki@gmail.com>
wrote:

> Thanks guys for your answers. I put my answers in text, below.
>
> Cheers,
> Krzysztof Zarzycki
>
> 2015-09-10 15:39 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
>
>> The kafka direct stream meets those requirements.  You don't need
>> checkpointing for exactly-once.  Indeed, unless your output operations are
>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>> Instead, you need to store the offsets atomically in the same transaction
>> as your results.
>>
>
> To focus discussion, let's assume my operations are idempotent & I'm
> interested in at-least-once thanks to that (which is idempotent
> exactly-once as named in your pres). Did you say, that I don't need
> checkpointing for that? How then direct stream API would store offsets
>  between restarts?
>
>
>> See
>> https://github.com/koeninger/kafka-exactly-once
>> and the video / blog posts linked from it.
>>
>>
> I did that, thank you. What I want is to achieve "idempotent exactly-once"
> as named in your presentation.
>
>
>> The dibhatt consumer that Akhil linked is using zookeeper to store
>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>> idempotent output operations.
>>
> True, and I totally accept it if what I get is at-least-once.
>
>
>>
>>
> Regarding the issues around code changes and checkpointing, the most
>> straightforward way to deal with this is to just start a new version of
>> your job before stopping the old one.  If you care about delivery semantics
>> and are using checkpointing, your output operation must be idempotent
>> anyway, so having 2 versions of the code running at the same time for a
>> brief period should not be a problem.
>>
>
> How starting new version before stopping old one helps? Could you please
> explain a bit the mechanics of that?
> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
> situations when it will be just inapropriate to run old one, when, let's
> say, we discovered a bug and don't want to run it anymore.
>
>
> So... To sum up it correctly, if I want at-least-once, with simple code
> upgrades,  I need to:
> -  store offsets in external storage (I would choose ZK for that).
> -  read them on application restart and pass the TopicAndPartition->offset
> map to createDirectStream.
> -  And I don't need to use checkpoints at all then.
> Could you confirm that?
>
> It's a question where should I actually commit the ZK offsets. The easiest
> would be to do it on the end of every batch. Do you think I can use
> org.apache.spark.streaming.scheduler.StreamingListener, method
> onBatchCompleted for that? I don't think so, because probably we don't have
> access to finieshed offsets in it...
> So maybe each executor can commit the offsets?
>
> Alternative to that solution I just realized is to stay with old Kafka
> receiver (createStream API) and just enable Write Ahead Logs. This way, we
> don't lose any data on application kill, so have "idempotent exactly-once"
> semantics, offsets are stored in ZK for us, don't need to use
> checkpoints... Seems like viable option! Do you agree?
>
>
>
>
>
>
>>
>>
>>
>> On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <
>> dgoldenberg123@gmail.com> wrote:
>>
>>> >> checkpoints can't be used between controlled restarts
>>>
>>> Is that true? If so, why? From my testing, checkpoints appear to be
>>> working fine, we get the data we've missed between the time the consumer
>>> went down and the time we brought it back up.
>>>
>> I'm sorry I simplified the case. I meant "checkpoints can't be used
> between controlled restarts if you want to upgrade code in between".
>
>>
>>> >> If I cannot make checkpoints between code upgrades, does it mean
>>> that Spark does not help me at all with keeping my Kafka offsets? Does it
>>> mean, that I have to implement my own storing to/initalization of offsets
>>> from Zookeeper?
>>>
>>> By code upgrades, are code changes to the consumer program meant?
>>>
>> Exactly.
>
>>
>>> If that is the case, one idea we've been entertaining is that, if the
>>> consumer changes, especially if its configuration parameters change, it
>>> means that some older configuration may still be stuck in the
>>> checkpointing.  What we'd do in this case is, prior to starting the
>>> consumer, blow away the checkpointing directory and re-consume from Kafka
>>> from the smallest offsets.  In our case, it's OK to re-process; I realize
>>> that in many cases that may not be an option.  If that's the case then it
>>> would seem to follow that you have to manage offsets in Zk...
>>>
>>> Another thing to consider would be to treat upgrades operationally. In
>>> that, if an upgrade is to happen, consume the data up to a certain point
>>> then bring the system down for an upgrade. Remove checkpointing. Restart
>>> everything; the system would now be rebuilding the checkpointing and using
>>> your upgraded consumers.  (Again, this may not be possible in some systems
>>> where the data influx is constant and/or the data is mission critical)...
>>>
>> Thanks for your idea, but it is indeed impossible in my case to run kafka
> topic from beginning. We keep months, even hundreds of gigs of data in the
> topic, so I definitely don't want to process it all.
>
>
>>
>>> Perhaps this discussion implies that there may be a new feature in Spark
>>> where it intelligently drops the checkpointing or allows you to selectively
>>> pluck out and drop some items prior to restarting...
>>>
>>>
>>>
>>>
>>> On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das <akhil@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> This consumer pretty much covers all those scenarios you listed
>>>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki <
>>>> k.zarzycki@gmail.com> wrote:
>>>>
>>>>> Hi there,
>>>>> I have a problem with fulfilling all my needs when using Spark
>>>>> Streaming on Kafka. Let me enumerate my requirements:
>>>>> 1. I want to have at-least-once/exactly-once processing.
>>>>> 2. I want to have my application fault & simple stop tolerant. The
>>>>> Kafka offsets need to be tracked between restarts.
>>>>> 3. I want to be able to upgrade code of my application without losing
>>>>> Kafka offsets.
>>>>>
>>>>> Now what my requirements imply according to my knowledge:
>>>>> 1. implies using new Kafka DirectStream.
>>>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>>>> to the checkpoint as well.
>>>>> 3. implies that checkpoints can't be used between controlled restarts.
>>>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true)
(here
>>>>> is a description how:
>>>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>>>> )
>>>>>
>>>>> Now my problems are:
>>>>> 1. If I cannot make checkpoints between code upgrades, does it mean
>>>>> that Spark does not help me at all with keeping my Kafka offsets? Does
it
>>>>> mean, that I have to implement my own storing to/initalization of offsets
>>>>> from Zookeeper?
>>>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>>>> it seems that application does not fail, but stuck in running state.
Is
>>>>> that because stopGracefully deadlocks on exceptions? How to overcome
this
>>>>> problem? Maybe I can avoid setting shutdownHook and there is other way
to
>>>>> stop gracefully your app?
>>>>>
>>>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my
>>>>> app to be able to upgrade code & not lose Kafka offsets?
>>>>>
>>>>>
>>>>> Thank you a lot for your answers,
>>>>> Krzysztof Zarzycki
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message