flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh <jof...@gmail.com>
Subject Re: Reprocessing data in Flink / rebuilding Flink state
Date Mon, 01 Aug 2016 15:06:40 GMT
Cool, thanks - I've tried out the approach where we replay data from the
Kafka compacted log, then take a savepoint and switch to the live stream.

It works but I did have to add in a dummy operator for every operator that
was removed. Without doing this, I got an exception:
java.lang.IllegalStateException: Failed to rollback to savepoint Checkpoint
1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot map old
state for task 02ea922553bc7522bdea373f52a702d6 to the new program. This
indicates that the program has been changed in a non-compatible way  after
the savepoint.

I had a Kafka source and a flat mapper chained together when replaying, so
to make it work I had to add two dummy operators and assign the same UID I
used when replaying, like this:
stream.map(x =>
x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
x).name("dummy-2")

I guess it would be nice if Flink could recover from removed
tasks/operators without needing to add dummy operators like this.

Josh

On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> I have to try this to verify but I think the approach works if you give
> the two sources different UIDs. The reason is that Flink will ignore state
> for which it doesn't have an operator to assign it to. Therefore, the state
> of the "historical Kafka source" should be silently discarded.
>
> Cheers,
> Aljoscha
>
> On Fri, 29 Jul 2016 at 18:12 Josh <jofo90@gmail.com> wrote:
>
>> @Aljoscha - The N-input operator way sounds very nice, for now I think
>> I'll try and get something quick running the hacky way, then if we decide
>> to make this a permanent solution maybe I can work on the proper solution.
>> I was wondering about your suggestion for "warming up" the state and then
>> taking a savepoint and switching sources - since the Kafka sources are
>> stateful and are part of Flink's internal state, wouldn't this break when
>> trying to restore the job with a different source? Would I need to assign
>> the replay source a UID, and when switching from replay to live, remove the
>> replay source and replace it with an dummy operator with the same UID?
>>
>> @Jason - I see what you mean now, with the historical and live Flink
>> jobs. That's an interesting approach - I guess it's solving a slightly
>> different problem to my 'rebuilding Flink state upon starting job' - as
>> you're rebuilding state as part of the main job when it comes across events
>> that require historical data. Actually I think we'll need to do something
>> very similar in the future but right now I can probably get away with
>> something simpler!
>>
>> Thanks for the replies!
>>
>> Josh
>>
>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc.flk@gmail.com>
>> wrote:
>>
>>> Aljoscha's approach is probably better, but to answer your questions...
>>>
>>> >How do you send a request from one Flink job to another?
>>> All of our different flink jobs communicate over kafka.  So the main
>>> flink job would be listening to both a "live" kafka source, and a
>>> "historical" kafka source.  The historical flink job would listen to a
>>> "request" kafka source.  When the main job gets an event that it does not
>>> have state for it writes to the "request" topic.  The historical job would
>>> read the request, grab the relevant old events from GCS, and write them to
>>> the "historical" kafka topic.  The "historical" source and the "live"
>>> source are merged and proceed through the main flink job as one stream.
>>>
>>> >How do you handle the switchover between the live stream and the
>>> historical stream? Do you somehow block the live stream source and detect
>>> when the historical data source is no longer emitting new elements?
>>> When the main job sends a request to the historical job, the main job
>>> starts storing any events that are come in for that key.  As the historical
>>> events come in they are processed immediately.  The historical flink job
>>> flags the last event it sends.  When the main flink job sees the flagged
>>> event it knows it is caught up to where it was when it sent the request.
>>> You can then process the events that the main job stored, and when that is
>>> done you are caught up to the live stream, and can stop storing events for
>>> that key and just process them as normal.
>>>
>>> Keep in mind that this is the dangerous part that I was talking about,
>>> where memory in the main job would continue to build until the "historical"
>>> events are all processed.
>>>
>>> >In my case I would want the Flink state to always contain the latest
>>> state of every item (except when the job first starts and there's a period
>>> of time where it's rebuilding its state from the Kafka log).
>>> You could absolutely do it by reading from the beginning of a kafka
>>> topic.  The reason we do it with GCS is it is really cheap storage, and we
>>> are not planning on storing forever on the kafka topic.
>>>
>>> >Since I would have everything needed to rebuild the state persisted in
>>> a Kafka topic, I don't think I would need a second Flink job for this?
>>> The reason for the second flink job in our case is that we didn't really
>>> want to block the flink task slot while a single key gets caught up.  We
>>> have a much larger key domain then we have number of task slots, so there
>>> would be multiple keys on single task slot.  If you go with the single job
>>> approach (which might be the right approach for you guys) any other keys on
>>> that task slot will be blocked until the one key is getting it's state
>>> built up.
>>>
>>> Hope that helps,
>>>
>>> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jofo90@gmail.com> wrote:
>>>
>>>> Hi Jason,
>>>>
>>>> Thanks for the reply - I didn't quite understand all of it though!
>>>>
>>>> > it sends a request to the historical flink job for the old data
>>>> How do you send a request from one Flink job to another?
>>>>
>>>> > It continues storing the live events until all the events form the
>>>> historical job have been processed, then it processes the stored events,
>>>> and finally starts processing the live stream again.
>>>> How do you handle the switchover between the live stream and the
>>>> historical stream? Do you somehow block the live stream source and detect
>>>> when the historical data source is no longer emitting new elements?
>>>>
>>>> > So in you case it looks like what you could do is send a request to
>>>> the "historical" job whenever you get a item that you don't yet have the
>>>> current state of.
>>>> In my case I would want the Flink state to always contain the latest
>>>> state of every item (except when the job first starts and there's a period
>>>> of time where it's rebuilding its state from the Kafka log). Since I would
>>>> have everything needed to rebuild the state persisted in a Kafka topic, I
>>>> don't think I would need a second Flink job for this?
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc.flk@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey Josh,
>>>>>
>>>>> The way we replay historical data is we have a second Flink job that
>>>>> listens to the same live stream, and stores every single event in Google
>>>>> Cloud Storage.
>>>>>
>>>>> When the main Flink job that is processing the live stream gets a
>>>>> request for a specific data set that it has not been processing yet,
it
>>>>> sends a request to the historical flink job for the old data.  The live
job
>>>>> then starts storing relevant events from the live stream in state.  It
>>>>> continues storing the live events until all the events form the historical
>>>>> job have been processed, then it processes the stored events, and finally
>>>>> starts processing the live stream again.
>>>>>
>>>>> As long as it's properly keyed (we key on the specific data set) then
>>>>> it doesn't block anything, keeps everything ordered, and eventually catches
>>>>> up.  It also allows us to completely blow away state and rebuild it from
>>>>> scratch.
>>>>>
>>>>> So in you case it looks like what you could do is send a request to
>>>>> the "historical" job whenever you get a item that you don't yet have
the
>>>>> current state of.
>>>>>
>>>>> The potential problems you may have are that it may not be possible to
>>>>> store every single historical event, and that you need to make sure there
>>>>> is enough memory to handle the ever increasing state size while the
>>>>> historical events are being replayed (and make sure to clear the state
when
>>>>> it is done).
>>>>>
>>>>> It's a little complicated, and pretty expensive, but it works.  Let me
>>>>> know if something doesn't make sense.
>>>>>
>>>>>
>>>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jofo90@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I was wondering what approaches people usually take with reprocessing
>>>>>> data with Flink - specifically the case where you want to upgrade
a Flink
>>>>>> job, and make it reprocess historical data before continuing to process
a
>>>>>> live stream.
>>>>>>
>>>>>> I'm wondering if we can do something similar to the 'simple rewind'
>>>>>> or 'parallel rewind' which Samza uses to solve this problem, discussed
>>>>>> here:
>>>>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html
>>>>>>
>>>>>> Having used Flink over the past couple of months, the main issue
I've
>>>>>> had involves Flink's internal state - from my experience it seems
it is
>>>>>> easy to break the state when upgrading a job, or when changing the
>>>>>> parallelism of operators, plus there's no easy way to view/access
an
>>>>>> internal key-value state from outside Flink.
>>>>>>
>>>>>> For an example of what I mean, consider a Flink job which consumes
a
>>>>>> stream of 'updates' to items, and maintains a key-value store of
items
>>>>>> within Flink's internal state (e.g. in RocksDB). The job also writes
the
>>>>>> updated items to a Kafka topic:
>>>>>>
>>>>>> http://oi64.tinypic.com/34q5opf.jpg
>>>>>>
>>>>>> My worry with this is that the state in RocksDB could be lost or
>>>>>> become incompatible with an updated version of the job. If this happens,
we
>>>>>> need to be able to rebuild Flink's internal key-value store in RocksDB.
So
>>>>>> I'd like to be able to do something like this (which I believe is
the Samza
>>>>>> solution):
>>>>>>
>>>>>> http://oi67.tinypic.com/219ri95.jpg
>>>>>>
>>>>>> Has anyone done something like this already with Flink? If so are
>>>>>> there any examples of how to do this replay & switchover (rebuild
state by
>>>>>> consuming from a historical log, then switch over to processing the
live
>>>>>> stream)?
>>>>>>
>>>>>> Thanks for any insights,
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Jason Brelloch* | Product Developer
>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>>>> <http://www.bettercloud.com/>
>>>>> Subscribe to the BetterCloud Monitor
>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
>>>>> Get IT delivered to your inbox
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *Jason Brelloch* | Product Developer
>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>> <http://www.bettercloud.com/>
>>> Subscribe to the BetterCloud Monitor
>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
>>> Get IT delivered to your inbox
>>>
>>
>>

Mime
View raw message