flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hilmi Yildirim <Hilmi.Yildi...@dfki.de>
Subject Re: Map Reduce Sorting
Date Tue, 02 Aug 2016 08:01:36 GMT
Hi Fabian,

thank you very much! This answers my question.


BR,

Hilmi


Am 01.08.2016 um 22:29 schrieb Fabian Hueske:
> Hi Hilmi,
>
> the results of the combiner are usually not completely sorted and if they
> are this property is not leveraged.
> This is due to the following reasons:
> 1) a sort-combiner only sorts as much data as fits into memory. If there is
> more data, the result consists of multiple sorted sequences.
> 2) since recently, Flink features a hash-based combiner which is usually
> more efficient and does not produce sorted output.
> 3) Flink's pipelined shipping strategy would require that the
> receiver merges the result records from all senders on the fly while
> receiving data via the network. In case of a straggling sender task all
> other senders would be blocked due to backpressure. In addition, this would
> only work if the combiner does a full sort and not several in-memory sorts.
>
> So, a Reducer will always do a full sort of all received data before
> applying the Reduce function (if available, a combiner is applied before
> data is written to disk in case of an external sort).
>
> Hope this helps,
> Fabian
>
> 2016-08-01 18:25 GMT+02:00 Hilmi Yildirim <Hilmi.Yildirim@dfki.de>:
>
>> Hi,
>>
>> I have a question regarding when data points are sorted when applying a
>> simple Map Reduce Job.
>>
>> I have the following code:
>>
>> data = readFromSource()
>>
>> data.map(....).groupBy(0).reduce(...)
>>
>> This code will be translated into the following execution plan:
>>
>> map -> combiner -> hash partitioning and sorting on 0 -> reduce.
>>
>>
>> If I am right then the combiner firstly sorts the data, then it applies
>> the combine function, and then it partitions the result.
>>
>> Now the partitions are consumed by the reducers. For each mapper/combiner
>> machine, the reducer has an input gateway. For example, the mappers and
>> combiners run on 10 machines, then each reducer has 10 input gateways. Now,
>> the reducer consumes the data via a MutableObjectIterator. This iterator
>> firstly consumes data from one input gateway, then from the other and so
>> on. Is the data of a single input gateway already sorted? Because the
>> combiner function has sorted the data already. Is the order of the data
>> points maintained after they are sent through the network?
>>
>> In my code, the MutableObjectIterator instances are subclasses of
>> NormalizedKeySorter. Does this mean that the data from an input gateway is
>> firstly sorted before it is handover to the reduce function? Is this
>> because the order of the data points is not mainted after sending through
>> the network?
>>
>>
>> It would be nice if someone can answer my question. If my assumptions are
>> wrong, please correct me :)
>>
>>
>> BR,
>>
>> Hilmi
>>
>>
>>
>>
>> --
>> ==================================================================
>> Hilmi Yildirim, M.Sc.
>> Researcher
>>
>> DFKI GmbH
>> Intelligente Analytik für Massendaten
>> DFKI Projektbüro Berlin
>> Alt-Moabit 91c
>> D-10559 Berlin
>> Phone: +49 30 23895 1814
>>
>> E-Mail: Hilmi.Yildirim@dfki.de
>>
>> -------------------------------------------------------------
>> Deutsches Forschungszentrum fuer Kuenstliche Intelligenz GmbH
>> Firmensitz: Trippstadter Strasse 122, D-67663 Kaiserslautern
>>
>> Geschaeftsfuehrung:
>> Prof. Dr. Dr. h.c. mult. Wolfgang Wahlster (Vorsitzender)
>> Dr. Walter Olthoff
>>
>> Vorsitzender des Aufsichtsrats:
>> Prof. Dr. h.c. Hans A. Aukes
>>
>> Amtsgericht Kaiserslautern, HRB 2313
>> -------------------------------------------------------------
>>
>>

Mime
View raw message