flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Streaming statefull operator with hashmap
Date Thu, 12 Nov 2015 10:29:28 GMT
Hi,
you can do it using the register* methods on StreamExecutionEnvironment. So, for example:

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.registerType(InputType.class);
env.registerType(MicroModel.class);

If you want to have custom Kryo Serializers for those types you can also do:

env.registerTypeWithKryoSerializer(InputType.class, MyInputTypeSerializer.class);

I hope this gets you on the right track. :D

Cheers,
Aljoscha

> On 11 Nov 2015, at 21:14, Martin Neumann <mneumann@sics.se> wrote:
> 
> Thanks for the help.
> 
> TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its
> an IDE problem that .getClass() did not work. Intellij is a bit fiddly with
> those things.
> 
> 1) Making null the default value and initializing manually is probably more
>> efficient, because otherwise the empty map would have to be cloned each
>> time the default value is returned, which adds avoidable overhead.
> 
> 
> What do you mean by initialize manually? Can I do that direct in the open
> function or are we talking about checking for null in the FlatMap and
> initializing there? In general the program is supposed to constantly run
> once deployed, so I can get away with a little slower setup.
> 
> 2) The HashMap type will most likely go through Kryo, so for efficiency,
>> make sure you register the types "InputType" and "MicroModel" on the
>> execution environment.
>>    Here you need to do that manually, because they are type erased and
>> Flink cannot auto-register them.
> 
> 
> Can you point me to an example on how to do this?
> 
> cheers Martin
> 
> 
> On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen <sewen@apache.org> wrote:
> 
>> It should suffice to do something like
>> 
>> "getRuntimeContext().getKeyValueState("microModelMap", new
>> HashMap<InputType,MicroModel>().getClass(), null);"
>> 
>> Two more comments:
>> 
>> 1) Making null the default value and initializing manually is probably more
>> efficient, because otherwise the empty map would have to be cloned each
>> time the default value is returned, which adds avoidable overhead.
>> 
>> 2) The HashMap type will most likely go through Kryo, so for efficiency,
>> make sure you register the types "InputType" and "MicroModel" on the
>> execution environment.
>>    Here you need to do that manually, because they are type erased and
>> Flink cannot auto-register them.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:
>> 
>>> Hey,
>>> 
>>> Yes what you wrote should work. You can alternatively use
>>> TypeExtractor.getForObject(modelMapInit) to extract the tye information.
>>> 
>>> I also like to implement my custom type info for Hashmaps and the other
>>> types and use that.
>>> 
>>> Cheers,
>>> Gyula
>>> 
>>> Martin Neumann <mneumann@sics.se> ezt írta (időpont: 2015. nov. 11.,
>> Sze,
>>> 16:30):
>>> 
>>>> Hej,
>>>> 
>>>> What is the correct way of initializing a state-full operator that is
>>> using
>>>> a hashmap? modelMapInit.getClass() does not work neither does
>>>> HashMap.class. Do I have to implement my own TypeInformation class or
>> is
>>>> there a simpler way?
>>>> 
>>>> cheers Martin
>>>> 
>>>> private OperatorState<HashMap<InputType,MicroModel>> microModelMap;
>>>> 
>>>> @Override
>>>> public void open(Configuration parameters) throws Exception {
>>>>    HashMap<InputType,MicroModel> modelMapInit = new HashMap<>();
>>>>    this.microModelMap =
>>>> getRuntimeContext().getKeyValueState("microModelMap",
>>>> modelMapInit.getClass() , modelMapInit);
>>>> }
>>>> 
>>> 
>> 


Mime
View raw message