flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Map Reduce Sorting
Date Mon, 01 Aug 2016 20:29:19 GMT
Hi Hilmi,

the results of the combiner are usually not completely sorted and if they
are this property is not leveraged.
This is due to the following reasons:
1) a sort-combiner only sorts as much data as fits into memory. If there is
more data, the result consists of multiple sorted sequences.
2) since recently, Flink features a hash-based combiner which is usually
more efficient and does not produce sorted output.
3) Flink's pipelined shipping strategy would require that the
receiver merges the result records from all senders on the fly while
receiving data via the network. In case of a straggling sender task all
other senders would be blocked due to backpressure. In addition, this would
only work if the combiner does a full sort and not several in-memory sorts.

So, a Reducer will always do a full sort of all received data before
applying the Reduce function (if available, a combiner is applied before
data is written to disk in case of an external sort).

Hope this helps,
Fabian

2016-08-01 18:25 GMT+02:00 Hilmi Yildirim <Hilmi.Yildirim@dfki.de>:

> Hi,
>
> I have a question regarding when data points are sorted when applying a
> simple Map Reduce Job.
>
> I have the following code:
>
> data = readFromSource()
>
> data.map(....).groupBy(0).reduce(...)
>
> This code will be translated into the following execution plan:
>
> map -> combiner -> hash partitioning and sorting on 0 -> reduce.
>
>
> If I am right then the combiner firstly sorts the data, then it applies
> the combine function, and then it partitions the result.
>
> Now the partitions are consumed by the reducers. For each mapper/combiner
> machine, the reducer has an input gateway. For example, the mappers and
> combiners run on 10 machines, then each reducer has 10 input gateways. Now,
> the reducer consumes the data via a MutableObjectIterator. This iterator
> firstly consumes data from one input gateway, then from the other and so
> on. Is the data of a single input gateway already sorted? Because the
> combiner function has sorted the data already. Is the order of the data
> points maintained after they are sent through the network?
>
> In my code, the MutableObjectIterator instances are subclasses of
> NormalizedKeySorter. Does this mean that the data from an input gateway is
> firstly sorted before it is handover to the reduce function? Is this
> because the order of the data points is not mainted after sending through
> the network?
>
>
> It would be nice if someone can answer my question. If my assumptions are
> wrong, please correct me :)
>
>
> BR,
>
> Hilmi
>
>
>
>
> --
> ==================================================================
> Hilmi Yildirim, M.Sc.
> Researcher
>
> DFKI GmbH
> Intelligente Analytik für Massendaten
> DFKI Projektbüro Berlin
> Alt-Moabit 91c
> D-10559 Berlin
> Phone: +49 30 23895 1814
>
> E-Mail: Hilmi.Yildirim@dfki.de
>
> -------------------------------------------------------------
> Deutsches Forschungszentrum fuer Kuenstliche Intelligenz GmbH
> Firmensitz: Trippstadter Strasse 122, D-67663 Kaiserslautern
>
> Geschaeftsfuehrung:
> Prof. Dr. Dr. h.c. mult. Wolfgang Wahlster (Vorsitzender)
> Dr. Walter Olthoff
>
> Vorsitzender des Aufsichtsrats:
> Prof. Dr. h.c. Hans A. Aukes
>
> Amtsgericht Kaiserslautern, HRB 2313
> -------------------------------------------------------------
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message