flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Dataset filter improvement
Date Wed, 17 Feb 2016 09:22:51 GMT
Hi Max,
why do I need to register them? My job runs without problem also without
that.
The only problem with my POJOs was that I had to implement equals and hash
correctly, Flink didn't enforce me to do it but then results were wrong :(


On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mxm@apache.org> wrote:

> Hi Flavio,
>
> Stephan was referring to
>
> env.registerType(ExtendedClass1.class);
> env.registerType(ExtendedClass2.class);
>
> Cheers,
> Max
>
> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
> <pompermaier@okkam.it> wrote:
> > What do you mean exactly..? Probably I'm missing something here..remember
> > that I can specify the right subClass only after the last flatMap, after
> the
> > first map neither me nor Flink can know the exact subclass of BaseClass
> >
> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <sewen@apache.org> wrote:
> >>
> >> Class hierarchies should definitely work, even if the base class has no
> >> fields.
> >>
> >> They work more efficiently if you register the subclasses at the
> execution
> >> environment (Flink cannot infer them from the function signatures
> because
> >> the function signatures only contain the abstract base class).
> >>
> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
> >> <pompermaier@okkam.it> wrote:
> >>>
> >>> Because The classes are not related to each other. Do you think it's a
> >>> good idea to have something like this?
> >>>
> >>> abstract class BaseClass(){
> >>>    String someField;
> >>> }
> >>>
> >>> class ExtendedClass1 extends BaseClass (){
> >>>    String someOtherField11;
> >>>    String someOtherField12;
> >>>    String someOtherField13;
> >>>  ...
> >>> }
> >>>
> >>> class ExtendedClass2 extends BaseClass (){
> >>>    Integer someOtherField21;
> >>>    Double someOtherField22;
> >>>    Integer someOtherField23;
> >>>  ...
> >>> }
> >>>
> >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
> >>> flatMap that can be used to generated the specific datasets?
> >>> Doesn't this cause problem to Flink? Classes can be vrry different to
> >>> each other..maybe this can cause problems with the plan
> generation..isn't
> >>> it?
> >>>
> >>> Thanks Fabian and Stephan for the support!
> >>>
> >>>
> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <sewen@apache.org>
> wrote:
> >>>>
> >>>> Why not use an abstract base class and N subclasses?
> >>>>
> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fhueske@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> Unfortunately, there is no Either<1,...,n>.
> >>>>> You could implement something like a Tuple3<Option<Type1>,
> >>>>> Option<Type2>, Option<Type3>>. However, Flink does
not provide an
> Option
> >>>>> type (comes with Java8). You would need to implement it yourself
> incl.
> >>>>> TypeInfo and Serializer. You can get some inspiration from the
> Either type
> >>>>> info /serializer, if you want to go this way.
> >>>>>
> >>>>> Using a byte array would also work but doesn't look much easier
than
> >>>>> the Option approach to me.
> >>>>>
> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
> >>>>>>
> >>>>>> Yes, the intermediate dataset I create then join again between
> >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that
possible to
> add?
> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]>
and in
> >>>>>> the subsequent filter+map/flatMap deserialize only those elements
I
> want to
> >>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate
> the typed
> >>>>>> dataset based.
> >>>>>> Which one  do you think is the best solution?
> >>>>>>
> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fhueske@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi Flavio,
> >>>>>>>
> >>>>>>> I did not completely understand which objects should go
where, but
> >>>>>>> here are some general guidelines:
> >>>>>>>
> >>>>>>> - early filtering is mostly a good idea (unless evaluating
the
> filter
> >>>>>>> expression is very expensive)
> >>>>>>> - you can use a flatMap function to combine a map and a
filter
> >>>>>>> - applying multiple functions on the same data set does
not
> >>>>>>> necessarily materialize the data set (in memory or on disk).
In
> most cases
> >>>>>>> it prevents chaining, hence there is serialization overhead.
In
> some cases
> >>>>>>> where the forked data streams are joined again, the data
set must
> be
> >>>>>>> materialized in order to avoid deadlocks.
> >>>>>>> - it is not possible to write a map that generates two different
> >>>>>>> types, but you could implement a mapper that returns an
> Either<First,
> >>>>>>> Second> type.
> >>>>>>>
> >>>>>>> Hope this helps,
> >>>>>>> Fabian
> >>>>>>>
> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it
> >:
> >>>>>>>>
> >>>>>>>> Any help on this?
> >>>>>>>>
> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <pompermaier@okkam.it>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi to all,
> >>>>>>>>>
> >>>>>>>>> in my program I have a Dataset that generated different
types of
> >>>>>>>>> object wrt the incoming element.
> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
> >>>>>>>>> In order to type the different generated datasets
I do something:
> >>>>>>>>>
> >>>>>>>>> Dataset<Tuple2> start =...
> >>>>>>>>>
> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
> >>>>>>>>>
> >>>>>>>>> However this is very inefficient (I think because
Flink needs to
> >>>>>>>>> materialize the entire source dataset for every
slot).
> >>>>>>>>>
> >>>>>>>>> It's much more efficient to group the generation
of objects of
> the
> >>>>>>>>> same type. E.g.:
> >>>>>>>>>
> >>>>>>>>> Dataset<Tuple2> start =..
> >>>>>>>>>
> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
> >>>>>>>>>
> >>>>>>>>> Increasing the number of slots per task manager
make things worse
> >>>>>>>>> and worse :)
> >>>>>>>>> Is there a way to improve this situation? Is it
possible to
> write a
> >>>>>>>>> "map" generating different type of object and then
filter them
> by generated
> >>>>>>>>> class type?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Flavio
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Mime
View raw message