flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Dataset filter improvement
Date Wed, 10 Feb 2016 09:05:35 GMT
Unfortunately, there is no Either<1,...,n>.
You could implement something like a Tuple3<Option<Type1>, Option<Type2>,
Option<Type3>>. However, Flink does not provide an Option type (comes with
Java8). You would need to implement it yourself incl. TypeInfo and
Serializer. You can get some inspiration from the Either type info
/serializer, if you want to go this way.

Using a byte array would also work but doesn't look much easier than the
Option approach to me.

2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:

> Yes, the intermediate dataset I create then join again between themselves.
> What I'd need is a Either<1,...,n>. Is that possible to add?
> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the
> subsequent filter+map/flatMap deserialize only those elements I want to
> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
> dataset based.
> Which one  do you think is the best solution?
>
> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> I did not completely understand which objects should go where, but here
>> are some general guidelines:
>>
>> - early filtering is mostly a good idea (unless evaluating the filter
>> expression is very expensive)
>> - you can use a flatMap function to combine a map and a filter
>> - applying multiple functions on the same data set does not necessarily
>> materialize the data set (in memory or on disk). In most cases it prevents
>> chaining, hence there is serialization overhead. In some cases where the
>> forked data streams are joined again, the data set must be materialized in
>> order to avoid deadlocks.
>> - it is not possible to write a map that generates two different types,
>> but you could implement a mapper that returns an Either<First, Second> type.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> Any help on this?
>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <pompermaier@okkam.it> wrote:
>>>
>>>> Hi to all,
>>>>
>>>> in my program I have a Dataset that generated different types of object
>>>> wrt the incoming element.
>>>> Thus it's like a Map<Tuple2,Object>.
>>>> In order to type the different generated datasets I do something:
>>>>
>>>> Dataset<Tuple2> start =...
>>>>
>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>
>>>> However this is very inefficient (I think because Flink needs to
>>>> materialize the entire source dataset for every slot).
>>>>
>>>> It's much more efficient to group the generation of objects of the same
>>>> type. E.g.:
>>>>
>>>> Dataset<Tuple2> start =..
>>>>
>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>
>>>> Increasing the number of slots per task manager make things worse and
>>>> worse :)
>>>> Is there a way to improve this situation? Is it possible to write a
>>>> "map" generating different type of object and then filter them by generated
>>>> class type?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>
>

Mime
View raw message