flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Subject RE: Cross operation on two huge datasets
Date Thu, 23 Feb 2017 10:59:17 GMT
Hi and thanks for your answers !

I’m not sure I can define any index to split the workload since in my case any point could
be in any zone...
I think I’m currently trying to do it the way you call “theta-join”:

1-      Trying to split one dataset over the cluster and prepare it for work against with
the other one (ex: parse the shapes)

a.       Either using partitioning

b.       Either using N sources + filtering based on hash so I get complementary datasets

2-      Make my other dataset go “through” all the “splits” of the first one and enrich
/ filter it

a.       The dataset would probably have to be entirely read multiple times from hdfs (one
time per “split”)

I have other ideas but I don’t know if it’s doable in flink.


Is there a way for a object (key selector, flatmap) to obtain (and wait for) the result of
a previous dataset ? Only way I can think of is a “cross” between my one-record-dataset
(the result) and the other dataset. But maybe that’s very bad regarding resources ?

I’d like to try using a flatmap that clones the dataset in N parts (adding a partition key
0 to N-1 to each record), then use partitioning to “dispatch” each clone of the dataset
to a matching “shape matcher” partition; then I’d use cross to do the work, then group
back the results together (in case N clones of a point were inside different shapes). Maybe
that would split the workload of the cross by dividing the size of one of the two datasets
member of that cross …

sorry for my rambling if I’m not clear.


From: Xingcan Cui [mailto:xingcanc@gmail.com]
Sent: jeudi 23 février 2017 06:00
To: user@flink.apache.org
Subject: Re: Cross operation on two huge datasets

Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian product in join is
to use index, which exhibits as key grouping in Flink. However, it only supports many-to-one
mapping now, i.e., a shape or a point can only be distributed to a single group. Only points
and shapes belonging to the same group can be joined and that could reduce the inherent pair
comparisons (compared with a Cartesian product). It's perfectly suitable for equi-join.

@Fabian I saw this thread when I was just considering about theta-join (which will eventually
be supported) in Flink. Since it's impossible to group (index) a dataset for an arbitrary
theta-join, I think we may need some duplication mechanism here. For example, split a dataset
into n parts and send the other dataset to all of these parts. This could be more useful in
stream join. BTW, it seems that I've seen another thread discussing about this, but can not
find it now. What do you think?


On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com>>
Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each input it fills
a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled
with spilled records and records are again crossed. This is done until one iteration over
the spill records is done. Then the other buffer of the streamed input is filled with the
next records.
You should be aware that cross is a super expensive operation, especially if you evaluate
a complex condition for each pair of records. So cross can be easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial partitioning and
do a key-based join on the partitions. Within each partition you'd perform a cross.
Best, Fabian

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasquiers@ericsson.com<mailto:gwenhael.pasquiers@ericsson.com>>:

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset
is a list of points. The other one is a list of shapes (areas).

I want to know, for each point, the areas (they might overlap so a point can be in multiple
areas) it belongs to so I thought I’d “cross” my points and areas since I need to test
each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into
RAM or if it’s able to split the job in multiple iterations (even if it means reading one
of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct
me ☺

I’m using flink 1.0.1.

Thanks in advance


View raw message