flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: For each element in a dataset, do something with another dataset
Date Wed, 30 Sep 2015 09:20:26 GMT
The idea is to partition both datasets by range.
Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
[1,2,3] and p2: [4,5,6].
Each partition is given to a different instance of a MapPartition operator
(this is a bit tricky, because you cannot use broadcastSet. You could load
the corresponding partition it in the open() function from HDFS for
example).

DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
partition 1, everything > 3 goes to p2. You can partition a dataset by
range using the partitionCustom() function. The partitioned dataset is
given to the mapPartition operator that loaded a partition of dataset A in
each task instance.
You do the counting just like before (sorting the partition of dataset A,
binary sort, long[]), but add an additional count for the complete
partition (basically count all elements that arrive in the task instance).

If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
would be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
Now you need to compute the final count by adding the "all" counts of the
lower partitions to the counts of the "higher" partitions, i.e., add all:5
of p1 to all counts for p2.

This approach requires to know the value range and distribution of the
values which makes it a bit difficult. I guess you'll get the best
performance, if you partition in a way, that you have about equally sized
partitions of dataset B with the constraint that the corresponding
partitions of A fit into memory.

As I said, its a bit cumbersome. I hope you could follow my explanation.
Please ask if something is not clear ;-)

2015-09-30 10:46 GMT+02:00 Pieter Hameete <phameete@gmail.com>:

> Hi Fabian,
>
> thanks for your tips!
>
> Do you have some pointers for getting started with the 'tricky range
> partitioning'? I am quite keen to get this working with large datasets ;-)
>
> Cheers,
>
> Pieter
>
> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>
>> Hi Pieter,
>>
>> cross is indeed too expensive for this task.
>>
>> If dataset A fits into memory, you can do the following: Use a
>> RichMapPartitionFunction to process dataset B and add dataset A as a
>> broadcastSet. In the open method of mapPartition, you can load the
>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>> counts. For each element of dataset B, you do a binary search on the sorted
>> dataset A and increase all counts up to the position in the sorted list.
>> After all elements of dataset B have been processed, return the counts from
>> the long[].
>>
>> If dataset A doesn't fit into memory, things become more cumbersome and
>> we need to play some tricky with range partitioning...
>>
>> Let me know, if you have questions,
>> Fabian
>>
>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phameete@gmail.com>:
>>
>>> Good day everyone,
>>>
>>> I am looking for a good way to do the following:
>>>
>>> I have dataset A and dataset B, and for each element in dataset A I
>>> would like to filter dataset B and obtain the size of the result. To say it
>>> short:
>>>
>>> *for each element a in A -> B.filter( _ < a.propertyx).count*
>>>
>>> Currently I am doing a cross of dataset A and B, making tuples so I can
>>> then filter all the tuples where field2 < field1.propertya and then group
>>> by field1.id and get the sizes of the groups.However this is not
>>> working out in practice. When the datasets get larger, some Tasks hang on
>>> the CHAIN Cross -> Filter probably because there is insufficient memory for
>>> the cross to be completed?
>>>
>>> Does anyone have a suggestion on how I could make this work, especially
>>> with datasets that are larger than memory available to a separate Task?
>>>
>>> Thank you in advance for your time :-)
>>>
>>> Kind regards,
>>>
>>> Pieter Hameete
>>>
>>
>>
>

Mime
View raw message