flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From TechnoMage <mla...@technomage.com>
Subject Re: Conceptual question
Date Sat, 09 Jun 2018 01:45:56 GMT
Thank you all.  This discussion is very helpful.  It sounds like I can wait for 1.6 though
given our development status.

Michael

> On Jun 8, 2018, at 1:08 PM, David Anderson <david@data-artisans.com> wrote:
> 
> Hi all,
> 
> I think I see a way to eagerly do full state migration without writing your own Operator,
but it's kind of hacky and may have flaws I'm not aware of. 
> 
> In Flink 1.5 we now have the possibility to connect BroadcastStreams to KeyedStreams
and apply a KeyedBroadcastProcessFunction. This is relevant because in the processBroadcastElement()
method you can supply a KeyedStateFunction to the Context.applyToKeyedState() method, and
this KeyedStateFunction will be applied every item of keyed state associated with the state
descriptor you specify. I've been doing some experiments with this, and it's quite powerful
in cases where it's useful to operate on all of your application's state.
> 
> I believe this was intended for cases where an update to an item of broadcast state has
implications for associated keyed state, but I see nothing that prevents you from essentially
ignoring the broadcast stream and using this mechanism to implement keyed state migration.
> 
> David
> 
> 
> 
> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
wrote:
> Hi,
> 
> Yes it should be feasible. As I said before, with Flink 1.6 there will be better way
for migrating a state, but for now you either need to lazily convert the state, or iterate
over the keys and do the job manually.
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 15:52, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
>> 
>> Hi Piotrek,
>> 
>> So my question is: is that feasible to migrate state from `ProcessFunction` to my
own operator then use `getKeyedStateBackend()` to migrate the states?
>> If yes, is there anything I need to be careful with? If no, why and can it be available
in the future? Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>:
>> Hi,
>> 
>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the function
and you can not migrate your state that way.
>> 
>> As far as I know yes, at the moment in order to convert everything at once (without
getKeyes you still can implement lazy conversion) you would have to write your own operator.
>> 
>> Piotrek
>> 
>> 
>>> On 7 Jun 2018, at 15:26, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> I used `ProcessFunction` to implement it, but it seems that I can't call `getKeyedStateBackend()`
like `WindowOperator` did.
>>> I found that `getKeyedStateBackend()` is the method in `AbstractStreamOperator`
and `ProcessFunction` API didn't extend it.
>>> Dose that mean I can't look up all keys and migrate the entire previous states
to the new states in `ProcessFunction#open()`?
>>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to migration
state like the manner showed in `WindowOperator`? 
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>:
>>> What function are you implementing and how are you using it?
>>> 
>>> Usually it’s enough if your function implements RichFunction (or rather extend
from AbstractRichFunction) and then you could use RichFunction#open in the similar manner
as in the code that I posted in previous message. Flink in many places performs instanceof
chekcs like: org.apache.flink.api.com <http://org.apache.flink.api.com/>mon.functions.util.FunctionUtils#openFunction
>>> 
>>> public static void openFunction(Function function, Configuration parameters)
throws Exception{
>>>    if (function instanceof RichFunction) {
>>>       RichFunction richFunction = (RichFunction) function;
>>>       richFunction.open(parameters);
>>>    }
>>> }
>>> 
>>> Piotrek
>>> 
>>> 
>>>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
>>>> 
>>>> Hi Piotrek,
>>>> 
>>>> It seems that this was implemented by `Operator` API, which is a more low
level api compared to `Function` API.
>>>> Since in `Function` API level we can only migrate state by event triggered,
it is more convenient in this way to migrate state by foreach all keys in `open()` method.
>>>> If I was implemented state operator by `ProcessFunction` API, is it possible
to port it to `KeyedProcessOperator` and do the state migration that you mentioned?
>>>> And are there something concerned and difficulties that will leads to restored
state failed or other problems? Thank you!
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>:
>>>> Hi,
>>>> 
>>>> General solution for state/schema migration is under development and it might
be released with Flink 1.6.0.
>>>> 
>>>> Before that, you need to manually handle the state migration in your operator’s
open method. Lets assume that your OperatorV1 has a state field “stateV1”. Your OperatorV2
defines field “stateV2”, which is incompatible with previous version. What you can do,
is to add a logic in open method, to check:
>>>> 1. If “stateV2” is non empty, do nothing
>>>> 2. If there is no “stateV2”, iterate over all of the keys and manually
migrate “stateV1” to “stateV2”
>>>> 
>>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>> 
>>>> I have once implemented something like that here:
>>>> 
>>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
<https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
>>>> 
>>>> Hope that helps!
>>>> 
>>>> Piotrek
>>>> 
>>>> 
>>>>> On 6 Jun 2018, at 17:04, TechnoMage <mlatta@technomage.com <mailto:mlatta@technomage.com>>
wrote:
>>>>> 
>>>>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>>>>> 
>>>>> When a job is modified and we want to deploy the new version, what is
the preferred method?  Our jobs have a lot of keyed state.
>>>>> 
>>>>> If we use snapshots we have old state that may no longer apply to the
new pipeline.
>>>>> If we start a new job we can reprocess historical data from Kafka, but
that can be very resource heavy for a while.
>>>>> 
>>>>> Is there an option I am missing?  Are there facilities to “patch”
or “purge” selectively the keyed state?
>>>>> 
>>>>> Michael
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> 
> -- 
> David Anderson | Training Coordinator
> 
>  <https://data-artisans.com/>
> 
> Follow us @dataArtisans <https://twitter.com/dataArtisans>
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> 


Mime
View raw message