flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Dataset filter improvement
Date Wed, 10 Feb 2016 08:47:45 GMT
Yes, the intermediate dataset I create then join again between themselves.
What I'd need is a Either<1,...,n>. Is that possible to add?
Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the
subsequent filter+map/flatMap deserialize only those elements I want to
group togheter (e.g. t.f0=="someEventType") in order to generate the typed
dataset based.
Which one  do you think is the best solution?

On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Flavio,
>
> I did not completely understand which objects should go where, but here
> are some general guidelines:
>
> - early filtering is mostly a good idea (unless evaluating the filter
> expression is very expensive)
> - you can use a flatMap function to combine a map and a filter
> - applying multiple functions on the same data set does not necessarily
> materialize the data set (in memory or on disk). In most cases it prevents
> chaining, hence there is serialization overhead. In some cases where the
> forked data streams are joined again, the data set must be materialized in
> order to avoid deadlocks.
> - it is not possible to write a map that generates two different types,
> but you could implement a mapper that returns an Either<First, Second> type.
>
> Hope this helps,
> Fabian
>
> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> Any help on this?
>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <pompermaier@okkam.it> wrote:
>>
>>> Hi to all,
>>>
>>> in my program I have a Dataset that generated different types of object
>>> wrt the incoming element.
>>> Thus it's like a Map<Tuple2,Object>.
>>> In order to type the different generated datasets I do something:
>>>
>>> Dataset<Tuple2> start =...
>>>
>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>
>>> However this is very inefficient (I think because Flink needs to
>>> materialize the entire source dataset for every slot).
>>>
>>> It's much more efficient to group the generation of objects of the same
>>> type. E.g.:
>>>
>>> Dataset<Tuple2> start =..
>>>
>>> Dataset<MyObj1> tmp1 = start.map(..);
>>> Dataset<MyObj3> tmp2 = start.map(..);
>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>
>>> Increasing the number of slots per task manager make things worse and
>>> worse :)
>>> Is there a way to improve this situation? Is it possible to write a
>>> "map" generating different type of object and then filter them by generated
>>> class type?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>

Mime
View raw message