flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Subject RE: Cross operation on two huge datasets
Date Fri, 03 Mar 2017 10:18:44 GMT
I managed to avoid the classes reload by controlling the order of operations using “.withBroadcast”.

My first task (shapes parsing) now outputs an empty “DataSet<Void> synchro”

Then whenever I need to wait for that synchro dataset to be ready (and mainly the operations
prior to that dataset to be done), I use “.withBroadcast(“synchro”, synchro)” and
I do a get for that broadcast variable in my open method.

That way I’m sure that I won’t begin testing my points against an incomplete static RTree.
And also, since it’s a single job again, my static RTree remains valid ☺

Seems to be good for now even if the static thingie is a bit dirty.

However I’m surprised that reading 20 MB of parquet become 21GB of “bytes sent” by the
flink reader.

From: Gwenhael Pasquiers [mailto:gwenhael.pasquiers@ericsson.com]
Sent: jeudi 2 mars 2017 16:28
To: user@flink.apache.org
Subject: RE: Cross operation on two huge datasets

I made it so that I don’t care where the next operator will be scheduled.

I configured taskslots = 1 and parallelism = yarnnodes so that :

·         Each node contains 1/N th  of the shapes (simple repartition() of the shapes dataset).

·         The points will be cloned so that each partition of the points dataset will contain
the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different partition

That way, whatever flink choses to do, each point will be compared to each shape. That’s
why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid
ser/deser-ing that structure.

I tried to use join (all items have same key) but it looks like flink tried to serialize the
RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on

From: Till Rohrmann [mailto:trohrmann@apache.org]
Sent: jeudi 2 mars 2017 15:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets

Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule
the new operator instance. It might be the case that an operator responsible for another partition
gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model
the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point
data set.


On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasquiers@ericsson.com](mailto:gwenhael.pasquiers@ericsson.com)<http://mailto:[gwenhael.pasquiers@ericsson.com](mailto:gwenhael.pasquiers@ericsson.com)>
The best for me would be to make it “persist” inside of the JVM heap in some map since
I don’t even know if the structure is Serializable (I could try). But I understand.

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split”
the dataset parsing over the nodes ?

From: Till Rohrmann [mailto:trohrmann@apache.org<mailto:trohrmann@apache.org>]
Sent: jeudi 2 mars 2017 14:42

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets

Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. writing to
a shared directory or emitting the model and using one of Flink’s sinks) and when creating
the new operators you have to reread it from there (usually in the open method or from a Flink
source as part of a broadcasted data set).

If you want to give a data set to all instances of an operator, then you should broadcast
this data set. You can do something like

DataSet<Integer> input = ...

DataSet<Integer> broadcastSet = ...

input.flatMap(new RichFlatMapFunction<Integer, Integer>() {

    List<Integer> broadcastSet;


    public void open(Configuration configuration) {

        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");



    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception


}).withBroadcastSet(broadcastSet, "broadcast");


On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <gwenhael.pasquiers@ericsson.com<mailto:gwenhael.pasquiers@ericsson.com>>
I (almost) made it work the following way:

1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node
fill a static RTree (thanks for the tip).

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset
to all nodes, then apply a simple flatmap that will use the previously initialized static
RTree, adding the Shape information to the point. Then do a groupBy to merge the points that
were inside of multiple shapes.

This works very well in a local runtime but fails on yarn because it seems that the taskmanager
reloads the jar file between two jobs, making me lose my static RTree (I guess that newly
loaded class overwrites the older one).

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk
or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload
(since it would not be stored in any class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure that some
operations are done (parsing of shape) BEFORE starting others handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d need to
be able to use the result of the iteration of a dataset inside of my map.

Something like :

datasetA.flatmap(new MyMapOperator(datasetB))…

And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation
in datasetA. That way I could parse all my shapes in an RTree before handling my points, without
relying on static

Or any other way that would allow me to do something similar.

Thanks in advance for your insight.


From: Jain, Ankit [mailto:ankit.jain@here.com<mailto:ankit.jain@here.com>]
Sent: jeudi 23 février 2017 19:21
To: user@flink.apache.org<mailto:user@flink.apache.org>
Cc: Fabian Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com>>

Subject: Re: Cross operation on two huge datasets

Hi Gwen,
I would recommend looking into a data structure called RTree that is designed specifically
for this use case, i.e matching point to a region.


From: Fabian Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com>>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each input it fills
a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled
with spilled records and records are again crossed. This is done until one iteration over
the spill records is done. Then the other buffer of the streamed input is filled with the
next records.
You should be aware that cross is a super expensive operation, especially if you evaluate
a complex condition for each pair of records. So cross can be easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial partitioning and
do a key-based join on the partitions. Within each partition you'd perform a cross.
Best, Fabian

2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasquiers@ericsson.com<mailto:gwenhael.pasquiers@ericsson.com>>:

I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset
is a list of points. The other one is a list of shapes (areas).

I want to know, for each point, the areas (they might overlap so a point can be in multiple
areas) it belongs to so I thought I’d “cross” my points and areas since I need to test
each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two datasets into
RAM or if it’s able to split the job in multiple iterations (even if it means reading one
of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct
me ☺

I’m using flink 1.0.1.

Thanks in advance


View raw message