flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: For each element in a dataset, do something with another dataset
Date Mon, 05 Oct 2015 12:35:04 GMT
Hi Pieter,

a FlatMapFunction can only return values when the map() method is called.
However, in your use case, you would like to return values *after* the
function was called the last time. This is not possible with a
FlatMapFunction, because you cannot identify the last map() call.
The MapPartitionFunction is called only once with an iterator over the
whole partition. Hence you can return values after the iterator was fully
consumed.

The broadcast set is sent only once in both cases.

If it is possible to broadcast dataset B, you can also use a MapFunction
and don't need to store the count values.

Best, Fabian

2015-10-05 11:53 GMT+02:00 Pieter Hameete <phameete@gmail.com>:

> Hi Fabian,
>
> I have a question regarding the first approach. Is there a benefit gained
> from choosing a RichMapPartitionFunction over a RichMapFunction in this
> case? I assume that each broadcasted dataset is sent only once to each task
> manager?
>
> If I would broadcast dataset B, then I could for each element a in A count
> the number of elements in B that are smaller than a and output a tuple in a
> map operation. This would also save me a step in aggregating the results?
>
> Kind regards,
>
> Pieter
>
> 2015-09-30 12:44 GMT+02:00 Pieter Hameete <phameete@gmail.com>:
>
>> Hi Gabor, Fabian,
>>
>> thank you for your suggestions. I am intending to scale up so that I'm
>> sure that both A and B won't fit in memory. I'll see if I can come up with
>> a nice way to partition the datasets but if that will take too much time
>> I'll just have to accept that it wont work on large datasets. I'll let you
>> know if I managed to work something out, but I wont work on it until the
>> weekend :-)
>>
>> Cheers again,
>>
>> Pieter
>>
>> 2015-09-30 12:28 GMT+02:00 Gábor Gévay <ggab90@gmail.com>:
>>
>>> Hello,
>>>
>>> Alternatively, if dataset B fits in memory, but dataset A doesn't,
>>> then you can do it with broadcasting B to a RichMapPartitionFunction
>>> on A:
>>> In the open method of mapPartition, you sort B. Then, for each element
>>> of A, you do a binary search in B, and look at the index found by the
>>> binary search, which will be the count that you are looking for.
>>>
>>> Best,
>>> Gabor
>>>
>>>
>>>
>>> 2015-09-30 11:20 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>> > The idea is to partition both datasets by range.
>>> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
>>> > [1,2,3] and p2: [4,5,6].
>>> > Each partition is given to a different instance of a MapPartition
>>> operator
>>> > (this is a bit tricky, because you cannot use broadcastSet. You could
>>> load
>>> > the corresponding partition it in the open() function from HDFS for
>>> > example).
>>> >
>>> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go
to
>>> > partition 1, everything > 3 goes to p2. You can partition a dataset by
>>> range
>>> > using the partitionCustom() function. The partitioned dataset is given
>>> to
>>> > the mapPartition operator that loaded a partition of dataset A in each
>>> task
>>> > instance.
>>> > You do the counting just like before (sorting the partition of dataset
>>> A,
>>> > binary sort, long[]), but add an additional count for the complete
>>> partition
>>> > (basically count all elements that arrive in the task instance).
>>> >
>>> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
>>> would
>>> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
>>> > Now you need to compute the final count by adding the "all" counts of
>>> the
>>> > lower partitions to the counts of the "higher" partitions, i.e., add
>>> all:5
>>> > of p1 to all counts for p2.
>>> >
>>> > This approach requires to know the value range and distribution of the
>>> > values which makes it a bit difficult. I guess you'll get the best
>>> > performance, if you partition in a way, that you have about equally
>>> sized
>>> > partitions of dataset B with the constraint that the corresponding
>>> > partitions of A fit into memory.
>>> >
>>> > As I said, its a bit cumbersome. I hope you could follow my
>>> explanation.
>>> > Please ask if something is not clear ;-)
>>> >
>>> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete <phameete@gmail.com>:
>>> >>
>>> >> Hi Fabian,
>>> >>
>>> >> thanks for your tips!
>>> >>
>>> >> Do you have some pointers for getting started with the 'tricky range
>>> >> partitioning'? I am quite keen to get this working with large
>>> datasets ;-)
>>> >>
>>> >> Cheers,
>>> >>
>>> >> Pieter
>>> >>
>>> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>> >>>
>>> >>> Hi Pieter,
>>> >>>
>>> >>> cross is indeed too expensive for this task.
>>> >>>
>>> >>> If dataset A fits into memory, you can do the following: Use a
>>> >>> RichMapPartitionFunction to process dataset B and add dataset A
as a
>>> >>> broadcastSet. In the open method of mapPartition, you can load the
>>> >>> broadcasted set and sort it by a.propertyX and initialize a long[]
>>> for the
>>> >>> counts. For each element of dataset B, you do a binary search on
the
>>> sorted
>>> >>> dataset A and increase all counts up to the position in the sorted
>>> list.
>>> >>> After all elements of dataset B have been processed, return the
>>> counts from
>>> >>> the long[].
>>> >>>
>>> >>> If dataset A doesn't fit into memory, things become more cumbersome
>>> and
>>> >>> we need to play some tricky with range partitioning...
>>> >>>
>>> >>> Let me know, if you have questions,
>>> >>> Fabian
>>> >>>
>>> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phameete@gmail.com>:
>>> >>>>
>>> >>>> Good day everyone,
>>> >>>>
>>> >>>> I am looking for a good way to do the following:
>>> >>>>
>>> >>>> I have dataset A and dataset B, and for each element in dataset
A I
>>> >>>> would like to filter dataset B and obtain the size of the result.
>>> To say it
>>> >>>> short:
>>> >>>>
>>> >>>> for each element a in A -> B.filter( _ < a.propertyx).count
>>> >>>>
>>> >>>> Currently I am doing a cross of dataset A and B, making tuples
so I
>>> can
>>> >>>> then filter all the tuples where field2 < field1.propertya
and then
>>> group by
>>> >>>> field1.id and get the sizes of the groups.However this is not
>>> working out in
>>> >>>> practice. When the datasets get larger, some Tasks hang on the
>>> CHAIN Cross
>>> >>>> -> Filter probably because there is insufficient memory for
the
>>> cross to be
>>> >>>> completed?
>>> >>>>
>>> >>>> Does anyone have a suggestion on how I could make this work,
>>> especially
>>> >>>> with datasets that are larger than memory available to a separate
>>> Task?
>>> >>>>
>>> >>>> Thank you in advance for your time :-)
>>> >>>>
>>> >>>> Kind regards,
>>> >>>>
>>> >>>> Pieter Hameete
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>
>>
>

Mime
View raw message