flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Reprocessing data in Flink / rebuilding Flink state
Date Tue, 02 Aug 2016 15:17:03 GMT
+1 to ignore unmatched state.

Also +1 to allow programs that resume partially (add some new state that
starts empty)

Both are quite important for program evolution.

On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi <uce@apache.org> wrote:

> No, unfortunately this is the same for 1.1. The idea was to be explicit
> about what works and what not. I see that this is actually a pain for this
> use case (which is very nice and reasonable ;)). I think we can either
> always ignore state that does not match to the new job or if that is too
> aggressive we can add a flag to ignore unmatched state.
>
>
> On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> +Ufuk, looping him in directly
>>
>> Hmm, I think this is changed for the 1.1 release. Ufuk could you please
>> comment?
>>
>>
>> On Mon, 1 Aug 2016 at 08:07 Josh <jofo90@gmail.com> wrote:
>>
>>> Cool, thanks - I've tried out the approach where we replay data from the
>>> Kafka compacted log, then take a savepoint and switch to the live stream.
>>>
>>> It works but I did have to add in a dummy operator for every operator
>>> that was removed. Without doing this, I got an exception:
>>> java.lang.IllegalStateException: Failed to rollback to savepoint
>>> Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot
>>> map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program.
>>> This indicates that the program has been changed in a non-compatible way
>>>  after the savepoint.
>>>
>>> I had a Kafka source and a flat mapper chained together when replaying,
>>> so to make it work I had to add two dummy operators and assign the same UID
>>> I used when replaying, like this:
>>> stream.map(x =>
>>> x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
>>> x).name("dummy-2")
>>>
>>> I guess it would be nice if Flink could recover from removed
>>> tasks/operators without needing to add dummy operators like this.
>>>
>>> Josh
>>>
>>> On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> I have to try this to verify but I think the approach works if you give
>>>> the two sources different UIDs. The reason is that Flink will ignore state
>>>> for which it doesn't have an operator to assign it to. Therefore, the state
>>>> of the "historical Kafka source" should be silently discarded.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Fri, 29 Jul 2016 at 18:12 Josh <jofo90@gmail.com> wrote:
>>>>
>>>>> @Aljoscha - The N-input operator way sounds very nice, for now I think
>>>>> I'll try and get something quick running the hacky way, then if we decide
>>>>> to make this a permanent solution maybe I can work on the proper solution.
>>>>> I was wondering about your suggestion for "warming up" the state and
then
>>>>> taking a savepoint and switching sources - since the Kafka sources are
>>>>> stateful and are part of Flink's internal state, wouldn't this break
when
>>>>> trying to restore the job with a different source? Would I need to assign
>>>>> the replay source a UID, and when switching from replay to live, remove
the
>>>>> replay source and replace it with an dummy operator with the same UID?
>>>>>
>>>>> @Jason - I see what you mean now, with the historical and live Flink
>>>>> jobs. That's an interesting approach - I guess it's solving a slightly
>>>>> different problem to my 'rebuilding Flink state upon starting job' -
as
>>>>> you're rebuilding state as part of the main job when it comes across
events
>>>>> that require historical data. Actually I think we'll need to do something
>>>>> very similar in the future but right now I can probably get away with
>>>>> something simpler!
>>>>>
>>>>> Thanks for the replies!
>>>>>
>>>>> Josh
>>>>>
>>>>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc.flk@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Aljoscha's approach is probably better, but to answer your
>>>>>> questions...
>>>>>>
>>>>>> >How do you send a request from one Flink job to another?
>>>>>> All of our different flink jobs communicate over kafka.  So the main
>>>>>> flink job would be listening to both a "live" kafka source, and a
>>>>>> "historical" kafka source.  The historical flink job would listen
to a
>>>>>> "request" kafka source.  When the main job gets an event that it
does not
>>>>>> have state for it writes to the "request" topic.  The historical
job would
>>>>>> read the request, grab the relevant old events from GCS, and write
them to
>>>>>> the "historical" kafka topic.  The "historical" source and the "live"
>>>>>> source are merged and proceed through the main flink job as one stream.
>>>>>>
>>>>>> >How do you handle the switchover between the live stream and
the
>>>>>> historical stream? Do you somehow block the live stream source and
detect
>>>>>> when the historical data source is no longer emitting new elements?
>>>>>> When the main job sends a request to the historical job, the main
job
>>>>>> starts storing any events that are come in for that key.  As the
historical
>>>>>> events come in they are processed immediately.  The historical flink
job
>>>>>> flags the last event it sends.  When the main flink job sees the
flagged
>>>>>> event it knows it is caught up to where it was when it sent the request.
>>>>>> You can then process the events that the main job stored, and when
that is
>>>>>> done you are caught up to the live stream, and can stop storing events
for
>>>>>> that key and just process them as normal.
>>>>>>
>>>>>> Keep in mind that this is the dangerous part that I was talking
>>>>>> about, where memory in the main job would continue to build until
the
>>>>>> "historical" events are all processed.
>>>>>>
>>>>>> >In my case I would want the Flink state to always contain the
latest
>>>>>> state of every item (except when the job first starts and there's
a period
>>>>>> of time where it's rebuilding its state from the Kafka log).
>>>>>> You could absolutely do it by reading from the beginning of a kafka
>>>>>> topic.  The reason we do it with GCS is it is really cheap storage,
and we
>>>>>> are not planning on storing forever on the kafka topic.
>>>>>>
>>>>>> >Since I would have everything needed to rebuild the state persisted
>>>>>> in a Kafka topic, I don't think I would need a second Flink job for
this?
>>>>>> The reason for the second flink job in our case is that we didn't
>>>>>> really want to block the flink task slot while a single key gets
caught
>>>>>> up.  We have a much larger key domain then we have number of task
slots, so
>>>>>> there would be multiple keys on single task slot.  If you go with
the
>>>>>> single job approach (which might be the right approach for you guys)
any
>>>>>> other keys on that task slot will be blocked until the one key is
getting
>>>>>> it's state built up.
>>>>>>
>>>>>> Hope that helps,
>>>>>>
>>>>>> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jofo90@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Jason,
>>>>>>>
>>>>>>> Thanks for the reply - I didn't quite understand all of it though!
>>>>>>>
>>>>>>> > it sends a request to the historical flink job for the old
data
>>>>>>> How do you send a request from one Flink job to another?
>>>>>>>
>>>>>>> > It continues storing the live events until all the events
form the
>>>>>>> historical job have been processed, then it processes the stored
events,
>>>>>>> and finally starts processing the live stream again.
>>>>>>> How do you handle the switchover between the live stream and
the
>>>>>>> historical stream? Do you somehow block the live stream source
and detect
>>>>>>> when the historical data source is no longer emitting new elements?
>>>>>>>
>>>>>>> > So in you case it looks like what you could do is send a
request
>>>>>>> to the "historical" job whenever you get a item that you don't
yet have the
>>>>>>> current state of.
>>>>>>> In my case I would want the Flink state to always contain the
latest
>>>>>>> state of every item (except when the job first starts and there's
a period
>>>>>>> of time where it's rebuilding its state from the Kafka log).
Since I would
>>>>>>> have everything needed to rebuild the state persisted in a Kafka
topic, I
>>>>>>> don't think I would need a second Flink job for this?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Josh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc.flk@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hey Josh,
>>>>>>>>
>>>>>>>> The way we replay historical data is we have a second Flink
job
>>>>>>>> that listens to the same live stream, and stores every single
event in
>>>>>>>> Google Cloud Storage.
>>>>>>>>
>>>>>>>> When the main Flink job that is processing the live stream
gets a
>>>>>>>> request for a specific data set that it has not been processing
yet, it
>>>>>>>> sends a request to the historical flink job for the old data.
 The live job
>>>>>>>> then starts storing relevant events from the live stream
in state.  It
>>>>>>>> continues storing the live events until all the events form
the historical
>>>>>>>> job have been processed, then it processes the stored events,
and finally
>>>>>>>> starts processing the live stream again.
>>>>>>>>
>>>>>>>> As long as it's properly keyed (we key on the specific data
set)
>>>>>>>> then it doesn't block anything, keeps everything ordered,
and eventually
>>>>>>>> catches up.  It also allows us to completely blow away state
and rebuild it
>>>>>>>> from scratch.
>>>>>>>>
>>>>>>>> So in you case it looks like what you could do is send a
request to
>>>>>>>> the "historical" job whenever you get a item that you don't
yet have the
>>>>>>>> current state of.
>>>>>>>>
>>>>>>>> The potential problems you may have are that it may not be
possible
>>>>>>>> to store every single historical event, and that you need
to make sure
>>>>>>>> there is enough memory to handle the ever increasing state
size while the
>>>>>>>> historical events are being replayed (and make sure to clear
the state when
>>>>>>>> it is done).
>>>>>>>>
>>>>>>>> It's a little complicated, and pretty expensive, but it works.
 Let
>>>>>>>> me know if something doesn't make sense.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jofo90@gmail.com>
wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I was wondering what approaches people usually take with
>>>>>>>>> reprocessing data with Flink - specifically the case
where you want to
>>>>>>>>> upgrade a Flink job, and make it reprocess historical
data before
>>>>>>>>> continuing to process a live stream.
>>>>>>>>>
>>>>>>>>> I'm wondering if we can do something similar to the 'simple
>>>>>>>>> rewind' or 'parallel rewind' which Samza uses to solve
this problem,
>>>>>>>>> discussed here:
>>>>>>>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html
>>>>>>>>>
>>>>>>>>> Having used Flink over the past couple of months, the
main issue
>>>>>>>>> I've had involves Flink's internal state - from my experience
it seems it
>>>>>>>>> is easy to break the state when upgrading a job, or when
changing the
>>>>>>>>> parallelism of operators, plus there's no easy way to
view/access an
>>>>>>>>> internal key-value state from outside Flink.
>>>>>>>>>
>>>>>>>>> For an example of what I mean, consider a Flink job which
consumes
>>>>>>>>> a stream of 'updates' to items, and maintains a key-value
store of items
>>>>>>>>> within Flink's internal state (e.g. in RocksDB). The
job also writes the
>>>>>>>>> updated items to a Kafka topic:
>>>>>>>>>
>>>>>>>>> http://oi64.tinypic.com/34q5opf.jpg
>>>>>>>>>
>>>>>>>>> My worry with this is that the state in RocksDB could
be lost or
>>>>>>>>> become incompatible with an updated version of the job.
If this happens, we
>>>>>>>>> need to be able to rebuild Flink's internal key-value
store in RocksDB. So
>>>>>>>>> I'd like to be able to do something like this (which
I believe is the Samza
>>>>>>>>> solution):
>>>>>>>>>
>>>>>>>>> http://oi67.tinypic.com/219ri95.jpg
>>>>>>>>>
>>>>>>>>> Has anyone done something like this already with Flink?
If so are
>>>>>>>>> there any examples of how to do this replay & switchover
(rebuild state by
>>>>>>>>> consuming from a historical log, then switch over to
processing the live
>>>>>>>>> stream)?
>>>>>>>>>
>>>>>>>>> Thanks for any insights,
>>>>>>>>> Josh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> *Jason Brelloch* | Product Developer
>>>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>>>>>>> <http://www.bettercloud.com/>
>>>>>>>> Subscribe to the BetterCloud Monitor
>>>>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
>>>>>>>> Get IT delivered to your inbox
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Jason Brelloch* | Product Developer
>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>>>>> <http://www.bettercloud.com/>
>>>>>> Subscribe to the BetterCloud Monitor
>>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
>>>>>> Get IT delivered to your inbox
>>>>>>
>>>>>
>>>>>
>>>
>

Mime
View raw message