flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Dataset filter improvement
Date Wed, 10 Feb 2016 11:42:15 GMT
Class hierarchies should definitely work, even if the base class has no
fields.

They work more efficiently if you register the subclasses at the execution
environment (Flink cannot infer them from the function signatures because
the function signatures only contain the abstract base class).

On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Because The classes are not related to each other. Do you think it's a
> good idea to have something like this?
>
> abstract class BaseClass(){
>    String someField;
> }
>
> class ExtendedClass1 extends BaseClass (){
>    String someOtherField11;
>    String someOtherField12;
>    String someOtherField13;
>  ...
> }
>
> class ExtendedClass2 extends BaseClass (){
>    Integer someOtherField21;
>    Double someOtherField22;
>    Integer someOtherField23;
>  ...
> }
>
> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
> flatMap that can be used to generated the specific datasets?
> Doesn't this cause problem to Flink? Classes can be vrry different to each
> other..maybe this can cause problems with the plan generation..isn't it?
>
> Thanks Fabian and Stephan for the support!
>
>
> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Why not use an abstract base class and N subclasses?
>>
>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fhueske@gmail.com>
>> wrote:
>>
>>> Unfortunately, there is no Either<1,...,n>.
>>> You could implement something like a Tuple3<Option<Type1>,
>>> Option<Type2>, Option<Type3>>. However, Flink does not provide
an Option
>>> type (comes with Java8). You would need to implement it yourself incl.
>>> TypeInfo and Serializer. You can get some inspiration from the Either type
>>> info /serializer, if you want to go this way.
>>>
>>> Using a byte array would also work but doesn't look much easier than the
>>> Option approach to me.
>>>
>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> Yes, the intermediate dataset I create then join again between
>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to
add?
>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in
the
>>>> subsequent filter+map/flatMap deserialize only those elements I want to
>>>> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
>>>> dataset based.
>>>> Which one  do you think is the best solution?
>>>>
>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> I did not completely understand which objects should go where, but
>>>>> here are some general guidelines:
>>>>>
>>>>> - early filtering is mostly a good idea (unless evaluating the filter
>>>>> expression is very expensive)
>>>>> - you can use a flatMap function to combine a map and a filter
>>>>> - applying multiple functions on the same data set does not
>>>>> necessarily materialize the data set (in memory or on disk). In most
cases
>>>>> it prevents chaining, hence there is serialization overhead. In some
cases
>>>>> where the forked data streams are joined again, the data set must be
>>>>> materialized in order to avoid deadlocks.
>>>>> - it is not possible to write a map that generates two different
>>>>> types, but you could implement a mapper that returns an Either<First,
>>>>> Second> type.
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> Any help on this?
>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <pompermaier@okkam.it>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi to all,
>>>>>>>
>>>>>>> in my program I have a Dataset that generated different types
of
>>>>>>> object wrt the incoming element.
>>>>>>> Thus it's like a Map<Tuple2,Object>.
>>>>>>> In order to type the different generated datasets I do something:
>>>>>>>
>>>>>>> Dataset<Tuple2> start =...
>>>>>>>
>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>>>>
>>>>>>> However this is very inefficient (I think because Flink needs
to
>>>>>>> materialize the entire source dataset for every slot).
>>>>>>>
>>>>>>> It's much more efficient to group the generation of objects of
the
>>>>>>> same type. E.g.:
>>>>>>>
>>>>>>> Dataset<Tuple2> start =..
>>>>>>>
>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>>>>
>>>>>>> Increasing the number of slots per task manager make things worse
>>>>>>> and worse :)
>>>>>>> Is there a way to improve this situation? Is it possible to write
a
>>>>>>> "map" generating different type of object and then filter them
by generated
>>>>>>> class type?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message