flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xingcan Cui <xingc...@gmail.com>
Subject Re: Cross operation on two huge datasets
Date Thu, 23 Feb 2017 12:56:19 GMT
Hi,

@Gwen, sorry that I missed the cross function and showed you the wrong way.
@Fabian's answers are what I mean.

Considering that the cross function is so expensive, can we find a way to
restrict the broadcast. That is, if the groupBy function is a many-to-one
mapping, the cross function is an all-to-all mapping, is it possible to
define a many-to-many mapping function that broadcasts shapes to more than
one (but not all) index area?

Best,
Xingcan

On Thu, Feb 23, 2017 at 7:07 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Gwen,
>
> sorry I didn't read your answer, I was still writing mine when you sent
> yours ;-)
>
> Regarding your strategy, this is basically what Cross does:
> It keeps on input partitioned and broadcasts (replicates) the other one.
> On each partition, it combines the records of the partition of the first
> input with all records of the replicated second input.
> I think this is what you describe as well, right?
>
> As I wrote before, this approach is quadratic and does not scale to large
> data sizes.
> I would recommend to look into spatial partitioning. Otherwise, I do not
> see how the problem can be solved for large data sets.
>
> Best, Fabian
>
> 2017-02-23 12:00 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
>
>> Hi,
>>
>> Flink's batch DataSet API does already support (manual) theta-joins via
>> the CrossFunction. It combines each pair of records of two input data sets.
>> This is done by broadcasting (and hence replicating) one of the inputs.
>> @Xingcan, so I think what you describe is already there.
>> However, as I said before, it is often prohibitively expensive to
>> compute. When you are at a point, where a MapFunction with broadcast set is
>> not longer sufficient (the smaller data set does not fit into memory),
>> you're problem is often too big too compute.
>> The complexity of a Cartesian product (Cross) is simply quadratic.
>>
>> Regarding the specific problem of joining spatial shapes and points, I
>> would go with a spatial partitioning as follows:
>> - Partition the space and compute for each shape into which partitions it
>> belongs (could be more than one).
>> - Do the same for the points (will be exactly one).
>> - Do a 1-n join on the partition ids + an additional check if the point
>> is actually in the shape.
>>
>> The challenge here is to have partitions of similar size.
>>
>> Cheers, Fabian
>>
>> 2017-02-23 5:59 GMT+01:00 Xingcan Cui <xingcanc@gmail.com>:
>>
>>> Hi all,
>>>
>>> @Gwen From the database's point of view, the only way to avoid Cartesian
>>> product in join is to use index, which exhibits as key grouping in Flink.
>>> However, it only supports many-to-one mapping now, i.e., a shape or a point
>>> can only be distributed to a single group. Only points and shapes belonging
>>> to the same group can be joined and that could reduce the inherent pair
>>> comparisons (compared with a Cartesian product). It's perfectly
>>> suitable for equi-join.
>>>
>>> @Fabian I saw this thread when I was just considering about theta-join
>>> (which will eventually be supported) in Flink. Since it's impossible to
>>> group (index) a dataset for an arbitrary theta-join, I think we may need
>>> some duplication mechanism here. For example, split a dataset into n parts
>>> and send the other dataset to all of these parts. This could be more useful
>>> in stream join. BTW, it seems that I've seen another thread discussing
>>> about this, but can not find it now. What do you think?
>>>
>>> Best,
>>> Xingcan
>>>
>>> On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> Hi Gwen,
>>>>
>>>> Flink usually performs a block nested loop join to cross two data sets.
>>>> This algorithm spills one input to disk and streams the other input.
>>>> For each input it fills a memory buffer and to perform the cross. Then the
>>>> buffer of the spilled input is refilled with spilled records and records
>>>> are again crossed. This is done until one iteration over the spill records
>>>> is done. Then the other buffer of the streamed input is filled with the
>>>> next records.
>>>>
>>>> You should be aware that cross is a super expensive operation,
>>>> especially if you evaluate a complex condition for each pair of records.
So
>>>> cross can be easily too expensive to compute.
>>>> For such use cases it is usually better to apply a coarse-grained
>>>> spatial partitioning and do a key-based join on the partitions. Within each
>>>> partition you'd perform a cross.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <
>>>> gwenhael.pasquiers@ericsson.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I need (or at least I think I do) to do a cross operation between two
>>>>> huge datasets. One dataset is a list of points. The other one is a list
of
>>>>> shapes (areas).
>>>>>
>>>>>
>>>>>
>>>>> I want to know, for each point, the areas (they might overlap so a
>>>>> point can be in multiple areas) it belongs to so I thought I’d “cross”
my
>>>>> points and areas since I need to test each point against each area.
>>>>>
>>>>>
>>>>>
>>>>> I tried it and my job stucks seems to work for some seconds then, at
>>>>> some point, it stucks.
>>>>>
>>>>>
>>>>>
>>>>> I’m wondering if Flink, for cross operations, tries to load one of
the
>>>>> two datasets into RAM or if it’s able to split the job in multiple
>>>>> iterations (even if it means reading one of the two datasets multiple
>>>>> times).
>>>>>
>>>>>
>>>>>
>>>>> Or maybe I’m going at it the wrong way, or missing some parameters,
>>>>> feel free to correct me J
>>>>>
>>>>>
>>>>>
>>>>> I’m using flink 1.0.1.
>>>>>
>>>>>
>>>>>
>>>>> Thanks in advance
>>>>>
>>>>>
>>>>>
>>>>> Gwen’
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message