beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minaev <mina...@gmail.com>
Subject Re: ValueState for Dataflow runner and MapState for others
Date Tue, 13 Nov 2018 00:59:15 GMT
Yes, sure, that'll work, I will just have to support 2 different
implementations. I was hoping there is something more elegant.
Thank you Lukasz, I appreciate the response!

--
Dmitry

On Mon, Nov 12, 2018 at 2:02 PM Lukasz Cwik <lcwik@google.com> wrote:

> Could you write two different implementations of the DoFn and put your
> processing logic in another function that both DoFn's would invoke after
> doing any accessing of the state?
>
> Then during pipeline construction you could choose to apply the Map one or
> the Value one based upon which runner your using.
>
>
>
> On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev <minaevd@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Since Dataflow doesn't support MapState (
>> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
>> ValueState with a Map<> inside it. Is it a good idea? Here is an example
>> code:
>> ```
>> @StateId("myValueStore")
>> private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec
=
>> StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
>>
>> @ProcessElement
>> public void processElement( ProcessContext
>> context, @StateId("myValueStore") MapState<String, String> valueStore) {
>>     ...
>> }
>> ```
>>
>> I'd like to support other runners as well (e.g. FlinkRunner) and it seems
>> to be more efficient to use MapState in cases where I need to store a map
>> of values. So I'm thinking of the way to use MapState and ValueState for
>> different runners.
>>
>> I understand how to get a runner in runtime via pipeline options but I'm
>> not sure how to approach defining (and using) different StateSpec for
>> different runners.
>>
>> Here is a sample code for MapState:
>> ```
>> @StateId("myMapStore")
>> private final StateSpec<MapState<String, String>> mapStoreSpec =
>> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());
>>
>> @ProcessElement
>> public void processElement( ProcessContext context,
>>         @StateId("myMapStore") MapState<String, String> mapStore) {
>>     ...
>> }
>> ```
>>
>> Any ideas?
>>
>> Thank you,
>> Dmitry
>>
>

Mime
View raw message