flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Dataset filter improvement
Date Tue, 23 Feb 2016 15:08:35 GMT
Registering a data type is only relevant for the Kryo serializer or if you
want to serialize a subclass of a POJO. Registering has the advantage that
you assign an id to the class which is written instead of the full class
name. The latter is usually much longer than the id.

Cheers,
Till

On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels <mxm@apache.org> wrote:

> Hi Flavio,
>
> I think the point is that Flink can use its serialization tools if you
> register the class in advance. If you don't do that, it will use Kryo
> as a fall-back which is slightly less efficient.
>
> Equals and hash code have to be implemented correctly if you compare
> Pojos. For standard types like String or Integer, this is done
> automatically. For Pojos, Flink doesn't know whether it is implemented
> correctly or not. Every object in Java has a default equals and
> hashCode implementation.
>
> Cheers,
> Max
>
> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
> <pompermaier@okkam.it> wrote:
> > Hi Max,
> > why do I need to register them? My job runs without problem also without
> > that.
> > The only problem with my POJOs was that I had to implement equals and
> hash
> > correctly, Flink didn't enforce me to do it but then results were wrong
> :(
> >
> >
> >
> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mxm@apache.org>
> wrote:
> >>
> >> Hi Flavio,
> >>
> >> Stephan was referring to
> >>
> >> env.registerType(ExtendedClass1.class);
> >> env.registerType(ExtendedClass2.class);
> >>
> >> Cheers,
> >> Max
> >>
> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
> >> <pompermaier@okkam.it> wrote:
> >> > What do you mean exactly..? Probably I'm missing something
> >> > here..remember
> >> > that I can specify the right subClass only after the last flatMap,
> after
> >> > the
> >> > first map neither me nor Flink can know the exact subclass of
> BaseClass
> >> >
> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <sewen@apache.org>
> wrote:
> >> >>
> >> >> Class hierarchies should definitely work, even if the base class has
> no
> >> >> fields.
> >> >>
> >> >> They work more efficiently if you register the subclasses at the
> >> >> execution
> >> >> environment (Flink cannot infer them from the function signatures
> >> >> because
> >> >> the function signatures only contain the abstract base class).
> >> >>
> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
> >> >> <pompermaier@okkam.it> wrote:
> >> >>>
> >> >>> Because The classes are not related to each other. Do you think
> it's a
> >> >>> good idea to have something like this?
> >> >>>
> >> >>> abstract class BaseClass(){
> >> >>>    String someField;
> >> >>> }
> >> >>>
> >> >>> class ExtendedClass1 extends BaseClass (){
> >> >>>    String someOtherField11;
> >> >>>    String someOtherField12;
> >> >>>    String someOtherField13;
> >> >>>  ...
> >> >>> }
> >> >>>
> >> >>> class ExtendedClass2 extends BaseClass (){
> >> >>>    Integer someOtherField21;
> >> >>>    Double someOtherField22;
> >> >>>    Integer someOtherField23;
> >> >>>  ...
> >> >>> }
> >> >>>
> >> >>> and then declare my map as Map<Tuple2,BaseClass>. and then
apply a
> >> >>> flatMap that can be used to generated the specific datasets?
> >> >>> Doesn't this cause problem to Flink? Classes can be vrry different
> to
> >> >>> each other..maybe this can cause problems with the plan
> >> >>> generation..isn't
> >> >>> it?
> >> >>>
> >> >>> Thanks Fabian and Stephan for the support!
> >> >>>
> >> >>>
> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <sewen@apache.org>
> >> >>> wrote:
> >> >>>>
> >> >>>> Why not use an abstract base class and N subclasses?
> >> >>>>
> >> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fhueske@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>
> >> >>>>> Unfortunately, there is no Either<1,...,n>.
> >> >>>>> You could implement something like a Tuple3<Option<Type1>,
> >> >>>>> Option<Type2>, Option<Type3>>. However,
Flink does not provide an
> >> >>>>> Option
> >> >>>>> type (comes with Java8). You would need to implement it
yourself
> >> >>>>> incl.
> >> >>>>> TypeInfo and Serializer. You can get some inspiration from
the
> >> >>>>> Either type
> >> >>>>> info /serializer, if you want to go this way.
> >> >>>>>
> >> >>>>> Using a byte array would also work but doesn't look much
easier
> than
> >> >>>>> the Option approach to me.
> >> >>>>>
> >> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <
> pompermaier@okkam.it>:
> >> >>>>>>
> >> >>>>>> Yes, the intermediate dataset I create then join again
between
> >> >>>>>> themselves. What I'd need is a Either<1,...,n>.
Is that possible
> to
> >> >>>>>> add?
> >> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]>
and
> in
> >> >>>>>> the subsequent filter+map/flatMap deserialize only
those
> elements I
> >> >>>>>> want to
> >> >>>>>> group togheter (e.g. t.f0=="someEventType") in order
to generate
> >> >>>>>> the typed
> >> >>>>>> dataset based.
> >> >>>>>> Which one  do you think is the best solution?
> >> >>>>>>
> >> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <
> fhueske@gmail.com>
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>> Hi Flavio,
> >> >>>>>>>
> >> >>>>>>> I did not completely understand which objects should
go where,
> but
> >> >>>>>>> here are some general guidelines:
> >> >>>>>>>
> >> >>>>>>> - early filtering is mostly a good idea (unless
evaluating the
> >> >>>>>>> filter
> >> >>>>>>> expression is very expensive)
> >> >>>>>>> - you can use a flatMap function to combine a map
and a filter
> >> >>>>>>> - applying multiple functions on the same data
set does not
> >> >>>>>>> necessarily materialize the data set (in memory
or on disk). In
> >> >>>>>>> most cases
> >> >>>>>>> it prevents chaining, hence there is serialization
overhead. In
> >> >>>>>>> some cases
> >> >>>>>>> where the forked data streams are joined again,
the data set
> must
> >> >>>>>>> be
> >> >>>>>>> materialized in order to avoid deadlocks.
> >> >>>>>>> - it is not possible to write a map that generates
two different
> >> >>>>>>> types, but you could implement a mapper that returns
an
> >> >>>>>>> Either<First,
> >> >>>>>>> Second> type.
> >> >>>>>>>
> >> >>>>>>> Hope this helps,
> >> >>>>>>> Fabian
> >> >>>>>>>
> >> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier
> >> >>>>>>> <pompermaier@okkam.it>:
> >> >>>>>>>>
> >> >>>>>>>> Any help on this?
> >> >>>>>>>>
> >> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <
> pompermaier@okkam.it>
> >> >>>>>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>> Hi to all,
> >> >>>>>>>>>
> >> >>>>>>>>> in my program I have a Dataset that generated
different types
> of
> >> >>>>>>>>> object wrt the incoming element.
> >> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
> >> >>>>>>>>> In order to type the different generated
datasets I do
> >> >>>>>>>>> something:
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<Tuple2> start =...
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
> >> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
> >> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
> >> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
> >> >>>>>>>>>
> >> >>>>>>>>> However this is very inefficient (I think
because Flink needs
> to
> >> >>>>>>>>> materialize the entire source dataset for
every slot).
> >> >>>>>>>>>
> >> >>>>>>>>> It's much more efficient to group the generation
of objects of
> >> >>>>>>>>> the
> >> >>>>>>>>> same type. E.g.:
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<Tuple2> start =..
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
> >> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
> >> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
> >> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
> >> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
> >> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
> >> >>>>>>>>>
> >> >>>>>>>>> Increasing the number of slots per task
manager make things
> >> >>>>>>>>> worse
> >> >>>>>>>>> and worse :)
> >> >>>>>>>>> Is there a way to improve this situation?
Is it possible to
> >> >>>>>>>>> write a
> >> >>>>>>>>> "map" generating different type of object
and then filter them
> >> >>>>>>>>> by generated
> >> >>>>>>>>> class type?
> >> >>>>>>>>>
> >> >>>>>>>>> Best,
> >> >>>>>>>>> Flavio
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >> >
>

Mime
View raw message