flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Streaming statefull operator with hashmap
Date Wed, 18 Nov 2015 14:28:39 GMT
For initializing the Map manually, I meant making "null" the default value
and writing the code like

HashMap<InputType, MicroModel> map = state.value()
if (map == null) {
  map = new HashMap<>();
}

rather than expecting the state to always clone you a new empty map

On Thu, Nov 12, 2015 at 11:29 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> you can do it using the register* methods on StreamExecutionEnvironment.
> So, for example:
>
> // set up the execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.registerType(InputType.class);
> env.registerType(MicroModel.class);
>
> If you want to have custom Kryo Serializers for those types you can also
> do:
>
> env.registerTypeWithKryoSerializer(InputType.class,
> MyInputTypeSerializer.class);
>
> I hope this gets you on the right track. :D
>
> Cheers,
> Aljoscha
>
> > On 11 Nov 2015, at 21:14, Martin Neumann <mneumann@sics.se> wrote:
> >
> > Thanks for the help.
> >
> > TypeExtractor.getForObject(modelMapInit) did the job. Its possible that
> its
> > an IDE problem that .getClass() did not work. Intellij is a bit fiddly
> with
> > those things.
> >
> > 1) Making null the default value and initializing manually is probably
> more
> >> efficient, because otherwise the empty map would have to be cloned each
> >> time the default value is returned, which adds avoidable overhead.
> >
> >
> > What do you mean by initialize manually? Can I do that direct in the open
> > function or are we talking about checking for null in the FlatMap and
> > initializing there? In general the program is supposed to constantly run
> > once deployed, so I can get away with a little slower setup.
> >
> > 2) The HashMap type will most likely go through Kryo, so for efficiency,
> >> make sure you register the types "InputType" and "MicroModel" on the
> >> execution environment.
> >>    Here you need to do that manually, because they are type erased and
> >> Flink cannot auto-register them.
> >
> >
> > Can you point me to an example on how to do this?
> >
> > cheers Martin
> >
> >
> > On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen <sewen@apache.org> wrote:
> >
> >> It should suffice to do something like
> >>
> >> "getRuntimeContext().getKeyValueState("microModelMap", new
> >> HashMap<InputType,MicroModel>().getClass(), null);"
> >>
> >> Two more comments:
> >>
> >> 1) Making null the default value and initializing manually is probably
> more
> >> efficient, because otherwise the empty map would have to be cloned each
> >> time the default value is returned, which adds avoidable overhead.
> >>
> >> 2) The HashMap type will most likely go through Kryo, so for efficiency,
> >> make sure you register the types "InputType" and "MicroModel" on the
> >> execution environment.
> >>    Here you need to do that manually, because they are type erased and
> >> Flink cannot auto-register them.
> >>
> >> Greetings,
> >> Stephan
> >>
> >>
> >>
> >> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> >>
> >>> Hey,
> >>>
> >>> Yes what you wrote should work. You can alternatively use
> >>> TypeExtractor.getForObject(modelMapInit) to extract the tye
> information.
> >>>
> >>> I also like to implement my custom type info for Hashmaps and the other
> >>> types and use that.
> >>>
> >>> Cheers,
> >>> Gyula
> >>>
> >>> Martin Neumann <mneumann@sics.se> ezt írta (időpont: 2015. nov.
11.,
> >> Sze,
> >>> 16:30):
> >>>
> >>>> Hej,
> >>>>
> >>>> What is the correct way of initializing a state-full operator that is
> >>> using
> >>>> a hashmap? modelMapInit.getClass() does not work neither does
> >>>> HashMap.class. Do I have to implement my own TypeInformation class or
> >> is
> >>>> there a simpler way?
> >>>>
> >>>> cheers Martin
> >>>>
> >>>> private OperatorState<HashMap<InputType,MicroModel>> microModelMap;
> >>>>
> >>>> @Override
> >>>> public void open(Configuration parameters) throws Exception {
> >>>>    HashMap<InputType,MicroModel> modelMapInit = new HashMap<>();
> >>>>    this.microModelMap =
> >>>> getRuntimeContext().getKeyValueState("microModelMap",
> >>>> modelMapInit.getClass() , modelMapInit);
> >>>> }
> >>>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message