flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Dataset filter improvement
Date Wed, 17 Feb 2016 09:15:43 GMT
Hi Flavio,

Stephan was referring to

env.registerType(ExtendedClass1.class);
env.registerType(ExtendedClass2.class);

Cheers,
Max

On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
<pompermaier@okkam.it> wrote:
> What do you mean exactly..? Probably I'm missing something here..remember
> that I can specify the right subClass only after the last flatMap, after the
> first map neither me nor Flink can know the exact subclass of BaseClass
>
> On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>> Class hierarchies should definitely work, even if the base class has no
>> fields.
>>
>> They work more efficiently if you register the subclasses at the execution
>> environment (Flink cannot infer them from the function signatures because
>> the function signatures only contain the abstract base class).
>>
>> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>> <pompermaier@okkam.it> wrote:
>>>
>>> Because The classes are not related to each other. Do you think it's a
>>> good idea to have something like this?
>>>
>>> abstract class BaseClass(){
>>>    String someField;
>>> }
>>>
>>> class ExtendedClass1 extends BaseClass (){
>>>    String someOtherField11;
>>>    String someOtherField12;
>>>    String someOtherField13;
>>>  ...
>>> }
>>>
>>> class ExtendedClass2 extends BaseClass (){
>>>    Integer someOtherField21;
>>>    Double someOtherField22;
>>>    Integer someOtherField23;
>>>  ...
>>> }
>>>
>>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
>>> flatMap that can be used to generated the specific datasets?
>>> Doesn't this cause problem to Flink? Classes can be vrry different to
>>> each other..maybe this can cause problems with the plan generation..isn't
>>> it?
>>>
>>> Thanks Fabian and Stephan for the support!
>>>
>>>
>>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>> Why not use an abstract base class and N subclasses?
>>>>
>>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>>
>>>>> Unfortunately, there is no Either<1,...,n>.
>>>>> You could implement something like a Tuple3<Option<Type1>,
>>>>> Option<Type2>, Option<Type3>>. However, Flink does not
provide an Option
>>>>> type (comes with Java8). You would need to implement it yourself incl.
>>>>> TypeInfo and Serializer. You can get some inspiration from the Either
type
>>>>> info /serializer, if you want to go this way.
>>>>>
>>>>> Using a byte array would also work but doesn't look much easier than
>>>>> the Option approach to me.
>>>>>
>>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>
>>>>>> Yes, the intermediate dataset I create then join again between
>>>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible
to add?
>>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]>
and in
>>>>>> the subsequent filter+map/flatMap deserialize only those elements
I want to
>>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate
the typed
>>>>>> dataset based.
>>>>>> Which one  do you think is the best solution?
>>>>>>
>>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Flavio,
>>>>>>>
>>>>>>> I did not completely understand which objects should go where,
but
>>>>>>> here are some general guidelines:
>>>>>>>
>>>>>>> - early filtering is mostly a good idea (unless evaluating the
filter
>>>>>>> expression is very expensive)
>>>>>>> - you can use a flatMap function to combine a map and a filter
>>>>>>> - applying multiple functions on the same data set does not
>>>>>>> necessarily materialize the data set (in memory or on disk).
In most cases
>>>>>>> it prevents chaining, hence there is serialization overhead.
In some cases
>>>>>>> where the forked data streams are joined again, the data set
must be
>>>>>>> materialized in order to avoid deadlocks.
>>>>>>> - it is not possible to write a map that generates two different
>>>>>>> types, but you could implement a mapper that returns an Either<First,
>>>>>>> Second> type.
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>>>
>>>>>>>> Any help on this?
>>>>>>>>
>>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <pompermaier@okkam.it>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> in my program I have a Dataset that generated different
types of
>>>>>>>>> object wrt the incoming element.
>>>>>>>>> Thus it's like a Map<Tuple2,Object>.
>>>>>>>>> In order to type the different generated datasets I do
something:
>>>>>>>>>
>>>>>>>>> Dataset<Tuple2> start =...
>>>>>>>>>
>>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>>>>>>
>>>>>>>>> However this is very inefficient (I think because Flink
needs to
>>>>>>>>> materialize the entire source dataset for every slot).
>>>>>>>>>
>>>>>>>>> It's much more efficient to group the generation of objects
of the
>>>>>>>>> same type. E.g.:
>>>>>>>>>
>>>>>>>>> Dataset<Tuple2> start =..
>>>>>>>>>
>>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>>>>>>
>>>>>>>>> Increasing the number of slots per task manager make
things worse
>>>>>>>>> and worse :)
>>>>>>>>> Is there a way to improve this situation? Is it possible
to write a
>>>>>>>>> "map" generating different type of object and then filter
them by generated
>>>>>>>>> class type?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message