flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navneeth Krishnan <reachnavnee...@gmail.com>
Subject Re: State Maintenance
Date Thu, 07 Sep 2017 15:01:31 GMT
Will I be able to use both queryable MapState and union list state while
implementing the CheckpointedFunction interface? Because one of my major
requirement on that operator is to provide a queryable state and in order
to compute that state we need the common static state across all parallel
operator instances.

Thanks.

On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Navneeth,
>
> there's a lower level state interface that should address your
> requirements: OperatorStateStore.getUnionListState()
>
> This union list state is similar to the regular operator list state, but
> instead of splitting the list for recovery and giving out splits to
> operator instance, it restores the complete list on each operator instance.
> So it basically does a broadcast restore. If all operator have the same
> state, only one instance checkpoints its state and this state is restored
> to all other instances in case of a failure. This should also work with
> rescaling.
> The operator instance to checkpoint can be identified by (RuntimeContext.getIndexOfThisSubtask
> == 0).
>
> The OperatorStateStore is a bit hidden. You have to implement the
> CheckpointedFunction interface. When CheckpointedFunction.initializeState(FunctionInitializationContext
> context) is called context has a method getOperatorStateStore().
>
> I'd recommend to have a look at the detailed JavaDocs of all involved
> classes and methods.
>
> Hope this helps,
> Fabian
>
>
> 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan <reachnavneeth2@gmail.com>:
>
>> Thanks Gordon for your response. I have around 80 parallel flatmap
>> operator instances and each instance requires 3 states. Out of which one is
>> user state in which each operator will have unique user's data and I need
>> this data to be queryable. The other two states are kind of static states
>> which are only modified when there an update message in config stream. This
>> static data could easily be around 2GB and in my previous approach I used
>> operator state where the data is retrieved inside open method across all
>> operator instances whereas checkpointed only inside one of the operator
>> instance.
>>
>> One of the issue that I have is if I change the operator parallelism how
>> would it affect the internal state?
>>
>>
>> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org>
>> wrote:
>>
>>> Hi Navneeth,
>>>
>>> Answering your three questions separately:
>>>
>>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an
>>> entry
>>> from the map state, the state will be removed from the local RocksDB as
>>> well.
>>>
>>> 2. If state classes are not POJOs, they will be serialized by Kryo,
>>> unless a
>>> custom serializer is specifically specified otherwise. You can take a
>>> look
>>> at this document on how to do that [1].
>>>
>>> 3. I might need to know more information to be able to suggest properly
>>> for
>>> this one. How are you using the "huge state values"? From what you
>>> described, it seems like you only need it on one of the parallel
>>> instances,
>>> so I'm a bit curious on what they are actually used for. Are they needed
>>> when processing your records?
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/stream/state.html#custom-serialization-for-managed-state
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>

Mime
View raw message