flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Dataset filter improvement
Date Wed, 24 Feb 2016 08:39:28 GMT
Thanks Max and Till for the answers. However I still didn't understand
fully the difference...Here are my doubts:

   - If I don't register any of my POJO classes, they will be serialized
   with Kryo (black box for Flink)
   - If I register all of my POJO using env.registerType they will be
   serialized as POJO (which is slower than Tuple serialization but much
   faster than Kryo)
   - What if I call env.registerTypeWithKryoSerializer()? Why should I
   specify a serializer for Kryo?

Best,
Flavio

On Tue, Feb 23, 2016 at 4:08 PM, Till Rohrmann <trohrmann@apache.org> wrote:

> Registering a data type is only relevant for the Kryo serializer or if you
> want to serialize a subclass of a POJO. Registering has the advantage that
> you assign an id to the class which is written instead of the full class
> name. The latter is usually much longer than the id.
>
> Cheers,
> Till
>
> On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels <mxm@apache.org>
> wrote:
>
>> Hi Flavio,
>>
>> I think the point is that Flink can use its serialization tools if you
>> register the class in advance. If you don't do that, it will use Kryo
>> as a fall-back which is slightly less efficient.
>>
>> Equals and hash code have to be implemented correctly if you compare
>> Pojos. For standard types like String or Integer, this is done
>> automatically. For Pojos, Flink doesn't know whether it is implemented
>> correctly or not. Every object in Java has a default equals and
>> hashCode implementation.
>>
>> Cheers,
>> Max
>>
>> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
>> <pompermaier@okkam.it> wrote:
>> > Hi Max,
>> > why do I need to register them? My job runs without problem also without
>> > that.
>> > The only problem with my POJOs was that I had to implement equals and
>> hash
>> > correctly, Flink didn't enforce me to do it but then results were wrong
>> :(
>> >
>> >
>> >
>> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mxm@apache.org>
>> wrote:
>> >>
>> >> Hi Flavio,
>> >>
>> >> Stephan was referring to
>> >>
>> >> env.registerType(ExtendedClass1.class);
>> >> env.registerType(ExtendedClass2.class);
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
>> >> <pompermaier@okkam.it> wrote:
>> >> > What do you mean exactly..? Probably I'm missing something
>> >> > here..remember
>> >> > that I can specify the right subClass only after the last flatMap,
>> after
>> >> > the
>> >> > first map neither me nor Flink can know the exact subclass of
>> BaseClass
>> >> >
>> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <sewen@apache.org>
>> wrote:
>> >> >>
>> >> >> Class hierarchies should definitely work, even if the base class
>> has no
>> >> >> fields.
>> >> >>
>> >> >> They work more efficiently if you register the subclasses at the
>> >> >> execution
>> >> >> environment (Flink cannot infer them from the function signatures
>> >> >> because
>> >> >> the function signatures only contain the abstract base class).
>> >> >>
>> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>> >> >> <pompermaier@okkam.it> wrote:
>> >> >>>
>> >> >>> Because The classes are not related to each other. Do you think
>> it's a
>> >> >>> good idea to have something like this?
>> >> >>>
>> >> >>> abstract class BaseClass(){
>> >> >>>    String someField;
>> >> >>> }
>> >> >>>
>> >> >>> class ExtendedClass1 extends BaseClass (){
>> >> >>>    String someOtherField11;
>> >> >>>    String someOtherField12;
>> >> >>>    String someOtherField13;
>> >> >>>  ...
>> >> >>> }
>> >> >>>
>> >> >>> class ExtendedClass2 extends BaseClass (){
>> >> >>>    Integer someOtherField21;
>> >> >>>    Double someOtherField22;
>> >> >>>    Integer someOtherField23;
>> >> >>>  ...
>> >> >>> }
>> >> >>>
>> >> >>> and then declare my map as Map<Tuple2,BaseClass>. and
then apply a
>> >> >>> flatMap that can be used to generated the specific datasets?
>> >> >>> Doesn't this cause problem to Flink? Classes can be vrry different
>> to
>> >> >>> each other..maybe this can cause problems with the plan
>> >> >>> generation..isn't
>> >> >>> it?
>> >> >>>
>> >> >>> Thanks Fabian and Stephan for the support!
>> >> >>>
>> >> >>>
>> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <sewen@apache.org>
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> Why not use an abstract base class and N subclasses?
>> >> >>>>
>> >> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <
>> fhueske@gmail.com>
>> >> >>>> wrote:
>> >> >>>>>
>> >> >>>>> Unfortunately, there is no Either<1,...,n>.
>> >> >>>>> You could implement something like a Tuple3<Option<Type1>,
>> >> >>>>> Option<Type2>, Option<Type3>>. However,
Flink does not provide an
>> >> >>>>> Option
>> >> >>>>> type (comes with Java8). You would need to implement
it yourself
>> >> >>>>> incl.
>> >> >>>>> TypeInfo and Serializer. You can get some inspiration
from the
>> >> >>>>> Either type
>> >> >>>>> info /serializer, if you want to go this way.
>> >> >>>>>
>> >> >>>>> Using a byte array would also work but doesn't look
much easier
>> than
>> >> >>>>> the Option approach to me.
>> >> >>>>>
>> >> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <
>> pompermaier@okkam.it>:
>> >> >>>>>>
>> >> >>>>>> Yes, the intermediate dataset I create then join
again between
>> >> >>>>>> themselves. What I'd need is a Either<1,...,n>.
Is that
>> possible to
>> >> >>>>>> add?
>> >> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]>
>> and in
>> >> >>>>>> the subsequent filter+map/flatMap deserialize only
those
>> elements I
>> >> >>>>>> want to
>> >> >>>>>> group togheter (e.g. t.f0=="someEventType") in
order to generate
>> >> >>>>>> the typed
>> >> >>>>>> dataset based.
>> >> >>>>>> Which one  do you think is the best solution?
>> >> >>>>>>
>> >> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske
<
>> fhueske@gmail.com>
>> >> >>>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>> Hi Flavio,
>> >> >>>>>>>
>> >> >>>>>>> I did not completely understand which objects
should go where,
>> but
>> >> >>>>>>> here are some general guidelines:
>> >> >>>>>>>
>> >> >>>>>>> - early filtering is mostly a good idea (unless
evaluating the
>> >> >>>>>>> filter
>> >> >>>>>>> expression is very expensive)
>> >> >>>>>>> - you can use a flatMap function to combine
a map and a filter
>> >> >>>>>>> - applying multiple functions on the same data
set does not
>> >> >>>>>>> necessarily materialize the data set (in memory
or on disk). In
>> >> >>>>>>> most cases
>> >> >>>>>>> it prevents chaining, hence there is serialization
overhead. In
>> >> >>>>>>> some cases
>> >> >>>>>>> where the forked data streams are joined again,
the data set
>> must
>> >> >>>>>>> be
>> >> >>>>>>> materialized in order to avoid deadlocks.
>> >> >>>>>>> - it is not possible to write a map that generates
two
>> different
>> >> >>>>>>> types, but you could implement a mapper that
returns an
>> >> >>>>>>> Either<First,
>> >> >>>>>>> Second> type.
>> >> >>>>>>>
>> >> >>>>>>> Hope this helps,
>> >> >>>>>>> Fabian
>> >> >>>>>>>
>> >> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier
>> >> >>>>>>> <pompermaier@okkam.it>:
>> >> >>>>>>>>
>> >> >>>>>>>> Any help on this?
>> >> >>>>>>>>
>> >> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier"
<
>> pompermaier@okkam.it>
>> >> >>>>>>>> wrote:
>> >> >>>>>>>>>
>> >> >>>>>>>>> Hi to all,
>> >> >>>>>>>>>
>> >> >>>>>>>>> in my program I have a Dataset that
generated different
>> types of
>> >> >>>>>>>>> object wrt the incoming element.
>> >> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
>> >> >>>>>>>>> In order to type the different generated
datasets I do
>> >> >>>>>>>>> something:
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<Tuple2> start =...
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>> >> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>> >> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>> >> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>> >> >>>>>>>>>
>> >> >>>>>>>>> However this is very inefficient (I
think because Flink
>> needs to
>> >> >>>>>>>>> materialize the entire source dataset
for every slot).
>> >> >>>>>>>>>
>> >> >>>>>>>>> It's much more efficient to group the
generation of objects
>> of
>> >> >>>>>>>>> the
>> >> >>>>>>>>> same type. E.g.:
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<Tuple2> start =..
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>> >> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>> >> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>> >> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>> >> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>> >> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>> >> >>>>>>>>>
>> >> >>>>>>>>> Increasing the number of slots per
task manager make things
>> >> >>>>>>>>> worse
>> >> >>>>>>>>> and worse :)
>> >> >>>>>>>>> Is there a way to improve this situation?
Is it possible to
>> >> >>>>>>>>> write a
>> >> >>>>>>>>> "map" generating different type of
object and then filter
>> them
>> >> >>>>>>>>> by generated
>> >> >>>>>>>>> class type?
>> >> >>>>>>>>>
>> >> >>>>>>>>> Best,
>> >> >>>>>>>>> Flavio
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>
>> >> >>>>
>> >> >>>
>> >> >>
>> >> >
>>
>
>

Mime
View raw message