flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hilmi Yildirim <Hilmi.Yildi...@dfki.de>
Subject Re: Map Reduce Sorting
Date Wed, 03 Aug 2016 13:43:51 GMT
Hi,

I have another question. The reducer sorts its inputs before it starts 
with computation. Which sorting algorithm it is using? In Flink I found 
QuickSort, HeapSort and etc.

Does the sorting algorithm benefit from pre-sorted partitions. For 
example, a MergeSort algorithm can sort the partitions of multiple maps 
together to create a single sorted partition for the reducer. If the map 
partitions are already sorted, then the MergeSort algorithm can run faster.


Are there any benefits if the map partitions are sorted?


Thank you


BR,

Hilmi

Am 02.08.2016 um 10:01 schrieb Hilmi Yildirim:
> Hi Fabian,
>
> thank you very much! This answers my question.
>
>
> BR,
>
> Hilmi
>
>
> Am 01.08.2016 um 22:29 schrieb Fabian Hueske:
>> Hi Hilmi,
>>
>> the results of the combiner are usually not completely sorted and if 
>> they
>> are this property is not leveraged.
>> This is due to the following reasons:
>> 1) a sort-combiner only sorts as much data as fits into memory. If 
>> there is
>> more data, the result consists of multiple sorted sequences.
>> 2) since recently, Flink features a hash-based combiner which is usually
>> more efficient and does not produce sorted output.
>> 3) Flink's pipelined shipping strategy would require that the
>> receiver merges the result records from all senders on the fly while
>> receiving data via the network. In case of a straggling sender task all
>> other senders would be blocked due to backpressure. In addition, this 
>> would
>> only work if the combiner does a full sort and not several in-memory 
>> sorts.
>>
>> So, a Reducer will always do a full sort of all received data before
>> applying the Reduce function (if available, a combiner is applied before
>> data is written to disk in case of an external sort).
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-08-01 18:25 GMT+02:00 Hilmi Yildirim <Hilmi.Yildirim@dfki.de>:
>>
>>> Hi,
>>>
>>> I have a question regarding when data points are sorted when applying a
>>> simple Map Reduce Job.
>>>
>>> I have the following code:
>>>
>>> data = readFromSource()
>>>
>>> data.map(....).groupBy(0).reduce(...)
>>>
>>> This code will be translated into the following execution plan:
>>>
>>> map -> combiner -> hash partitioning and sorting on 0 -> reduce.
>>>
>>>
>>> If I am right then the combiner firstly sorts the data, then it applies
>>> the combine function, and then it partitions the result.
>>>
>>> Now the partitions are consumed by the reducers. For each 
>>> mapper/combiner
>>> machine, the reducer has an input gateway. For example, the mappers and
>>> combiners run on 10 machines, then each reducer has 10 input 
>>> gateways. Now,
>>> the reducer consumes the data via a MutableObjectIterator. This 
>>> iterator
>>> firstly consumes data from one input gateway, then from the other 
>>> and so
>>> on. Is the data of a single input gateway already sorted? Because the
>>> combiner function has sorted the data already. Is the order of the data
>>> points maintained after they are sent through the network?
>>>
>>> In my code, the MutableObjectIterator instances are subclasses of
>>> NormalizedKeySorter. Does this mean that the data from an input 
>>> gateway is
>>> firstly sorted before it is handover to the reduce function? Is this
>>> because the order of the data points is not mainted after sending 
>>> through
>>> the network?
>>>
>>>
>>> It would be nice if someone can answer my question. If my 
>>> assumptions are
>>> wrong, please correct me :)
>>>
>>>
>>> BR,
>>>
>>> Hilmi
>>>
>>>
>>>
>>>
>>> -- 
>>> ==================================================================
>>> Hilmi Yildirim, M.Sc.
>>> Researcher
>>>
>>> DFKI GmbH
>>> Intelligente Analytik für Massendaten
>>> DFKI Projektbüro Berlin
>>> Alt-Moabit 91c
>>> D-10559 Berlin
>>> Phone: +49 30 23895 1814
>>>
>>> E-Mail: Hilmi.Yildirim@dfki.de
>>>
>>> -------------------------------------------------------------
>>> Deutsches Forschungszentrum fuer Kuenstliche Intelligenz GmbH
>>> Firmensitz: Trippstadter Strasse 122, D-67663 Kaiserslautern
>>>
>>> Geschaeftsfuehrung:
>>> Prof. Dr. Dr. h.c. mult. Wolfgang Wahlster (Vorsitzender)
>>> Dr. Walter Olthoff
>>>
>>> Vorsitzender des Aufsichtsrats:
>>> Prof. Dr. h.c. Hans A. Aukes
>>>
>>> Amtsgericht Kaiserslautern, HRB 2313
>>> -------------------------------------------------------------
>>>
>>>



Mime
View raw message