flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pieter Hameete <phame...@gmail.com>
Subject Re: For each element in a dataset, do something with another dataset
Date Wed, 30 Sep 2015 08:46:28 GMT
Hi Fabian,

thanks for your tips!

Do you have some pointers for getting started with the 'tricky range
partitioning'? I am quite keen to get this working with large datasets ;-)



2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Hi Pieter,
> cross is indeed too expensive for this task.
> If dataset A fits into memory, you can do the following: Use a
> RichMapPartitionFunction to process dataset B and add dataset A as a
> broadcastSet. In the open method of mapPartition, you can load the
> broadcasted set and sort it by a.propertyX and initialize a long[] for the
> counts. For each element of dataset B, you do a binary search on the sorted
> dataset A and increase all counts up to the position in the sorted list.
> After all elements of dataset B have been processed, return the counts from
> the long[].
> If dataset A doesn't fit into memory, things become more cumbersome and we
> need to play some tricky with range partitioning...
> Let me know, if you have questions,
> Fabian
> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phameete@gmail.com>:
>> Good day everyone,
>> I am looking for a good way to do the following:
>> I have dataset A and dataset B, and for each element in dataset A I would
>> like to filter dataset B and obtain the size of the result. To say it short:
>> *for each element a in A -> B.filter( _ < a.propertyx).count*
>> Currently I am doing a cross of dataset A and B, making tuples so I can
>> then filter all the tuples where field2 < field1.propertya and then group
>> by field1.id and get the sizes of the groups.However this is not working
>> out in practice. When the datasets get larger, some Tasks hang on the CHAIN
>> Cross -> Filter probably because there is insufficient memory for the cross
>> to be completed?
>> Does anyone have a suggestion on how I could make this work, especially
>> with datasets that are larger than memory available to a separate Task?
>> Thank you in advance for your time :-)
>> Kind regards,
>> Pieter Hameete

View raw message