flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Cross operation on two huge datasets
Date Thu, 02 Mar 2017 13:41:41 GMT
Hi Gwenhael,

if you want to persist operator state, then you would have to persist it
(e.g. writing to a shared directory or emitting the model and using one of
Flink’s sinks) and when creating the new operators you have to reread it
from there (usually in the open method or from a Flink source as part of a
broadcasted data set).

If you want to give a data set to all instances of an operator, then you
should broadcast this data set. You can do something like

DataSet<Integer> input = ...
DataSet<Integer> broadcastSet = ...

input.flatMap(new RichFlatMapFunction<Integer, Integer>() {
    List<Integer> broadcastSet;

    @Override
    public void open(Configuration configuration) {
        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");
    }

    @Override
    public void flatMap(Integer integer, Collector<Integer> collector)
throws Exception {

    }
}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till
​

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <
gwenhael.pasquiers@ericsson.com> wrote:

> I (almost) made it work the following way:
>
>
>
> 1rst job : Read all the shapes, repartition() them equally on my N nodes,
> then on each node fill a static RTree (thanks for the tip).
>
>
>
> 2nd job : Read all the points, use a flatmap + custom partitioner to
> “clone” the dataset to all nodes, then apply a simple flatmap that will use
> the previously initialized static RTree, adding the Shape information to
> the point. Then do a groupBy to merge the points that were inside of
> multiple shapes.
>
>
>
> This works very well in a local runtime but fails on yarn because it seems
> that the taskmanager reloads the jar file between two jobs, making me lose
> my static RTree (I guess that newly loaded class overwrites the older one).
>
>
>
> I have two questions :
>
> -          Is there a way to avoid that jar reload // can I store my
> RTree somewhere in jdk or flink, locally to the taskmanager, in a way that
> it wouldn’t be affected by the jar reload (since it would not be stored in
> any class from MY jar)?
>
> o   I could also try to do it in a single job, but I don’t know how to
> ensure that some operations are done (parsing of shape) BEFORE starting
> others handling the points.
>
> -          Is there a way to do that in a clean way using flink operators
> ? I’d need to be able to use the result of the iteration of a dataset
> inside of my map.
>
>
>
> Something like :
>
>
>
> datasetA.flatmap(new MyMapOperator(datasetB))…
>
>
>
> And In my implementation I would be able to iterate the whole datasetB
> BEFORE doing any operation in datasetA. That way I could parse all my
> shapes in an RTree before handling my points, without relying on static
>
>
>
> Or any other way that would allow me to do something similar.
>
>
>
> Thanks in advance for your insight.
>
>
>
> Gwen’
>
>
>
> *From:* Jain, Ankit [mailto:ankit.jain@here.com]
> *Sent:* jeudi 23 février 2017 19:21
> *To:* user@flink.apache.org
> *Cc:* Fabian Hueske <fhueske@gmail.com>
>
> *Subject:* Re: Cross operation on two huge datasets
>
>
>
> Hi Gwen,
>
> I would recommend looking into a data structure called RTree that is
> designed specifically for this use case, i.e matching point to a region.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Fabian Hueske <fhueske@gmail.com>
> *Date: *Wednesday, February 22, 2017 at 2:41 PM
> *To: *<user@flink.apache.org>
> *Subject: *Re: Cross operation on two huge datasets
>
>
>
> Hi Gwen,
>
> Flink usually performs a block nested loop join to cross two data sets.
>
> This algorithm spills one input to disk and streams the other input. For
> each input it fills a memory buffer and to perform the cross. Then the
> buffer of the spilled input is refilled with spilled records and records
> are again crossed. This is done until one iteration over the spill records
> is done. Then the other buffer of the streamed input is filled with the
> next records.
>
> You should be aware that cross is a super expensive operation, especially
> if you evaluate a complex condition for each pair of records. So cross can
> be easily too expensive to compute.
>
> For such use cases it is usually better to apply a coarse-grained spatial
> partitioning and do a key-based join on the partitions. Within each
> partition you'd perform a cross.
>
> Best, Fabian
>
>
>
>
>
> 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <
> gwenhael.pasquiers@ericsson.com>:
>
> Hi,
>
>
>
> I need (or at least I think I do) to do a cross operation between two huge
> datasets. One dataset is a list of points. The other one is a list of
> shapes (areas).
>
>
>
> I want to know, for each point, the areas (they might overlap so a point
> can be in multiple areas) it belongs to so I thought I’d “cross” my points
> and areas since I need to test each point against each area.
>
>
>
> I tried it and my job stucks seems to work for some seconds then, at some
> point, it stucks.
>
>
>
> I’m wondering if Flink, for cross operations, tries to load one of the two
> datasets into RAM or if it’s able to split the job in multiple iterations
> (even if it means reading one of the two datasets multiple times).
>
>
>
> Or maybe I’m going at it the wrong way, or missing some parameters, feel
> free to correct me J
>
>
>
> I’m using flink 1.0.1.
>
>
>
> Thanks in advance
>
>
>
> Gwen’
>
>
>

Mime
View raw message