hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arkady Borkovsky <ark...@yahoo-inc.com>
Subject Re: [jira] Commented: (HADOOP-939) No-sort optimization
Date Mon, 29 Jan 2007 22:04:23 GMT
How savings can be much higher than 1/3.
(probably I'm already preaching to the quire)

1. The situation I have in mind looks like this:
-- you have a data set M with 10^11 to 10^12  records that is produced 
once in  a while and used hundreds of times (before its new version is 
generated).  This data set has been produced by a key-preserving 
reduce, so all its fragments are sorted, and the keys are split between 
the fragments in a known way.
-- Each time you "use" this data set, you have another data set D with 
the same keys space (and, for simplicity, same record type) with 10^8 
or 10^9 records.

The job is a JOIN -- it produces an output record for each key in the 
intersection of M and D.  Think of M as a database and D as a query.
The the number of output records is less or equal to the number of 
records in D.

If D fits into memory, you do not need a reduce, and everything can be 
done in map by copying D to all the map tasks. A pain, but no real 
problems -- map with no reduce does it.
If D does not fit into memory, a natural way do this processing is to
-- sort and split D into buckets so that there is one bucket for each 
block of M, with the same keys.
-- run a reduce task on each block of M close to this block and merge 
(join) this block with the corresponding bucket of D while reading the 
So  steps b-d  are needed only for D, and steps e-g -- for the output 
that is the same size as D -- few percent of the total data involved.

In this kind of applications, Eric's model becomes
now     no-sort    step
M+D     M+D        a.  1 read input data from local drive on map node
[ identity map ]
M+D      D         b.  1 write batches of sorted output data to 
temporary file on map node
M+D      D         c. 10 shuffle batches of sorted data
M+D      D  ("local" reduce)
         M+D ("anywhere" reduce)
                    d.  1 write batches of sorted data to reduce node
[ reduce]
D        D         e.  1 write one copy of output locally
D        D         f.  2 transfer and write one copy to another node on 
the same rack
D        D         g. 11 transfer and write one copy to an off-rack node

So it is 27*D+13*M  vs.  17*D+2*M  .  With D<<M,  the gain is about 6x 
or 12x, with ("anywhere" and "local" reduce, correspondingly).

2. A variation of the situation described above is when in addition to 
M ("database") and D ("query"), you have a third input data set U 
("incremental updates" or "deltas"). U has the size similar to D;  the 
record type of U is exactly the same as that in M.  The JOIN works 
similar to described above, but it takes a record from D, and one or 
more records from U and M.

On Jan 29, 2007, at 10:17 AM, Doug Cutting wrote:

> Arkady Borkovsky wrote:
>> Does this model assume that the size of the output of reduce is 
>> similar to the size of the input?
>> An important class of applications (mentioned in this thread before) 
>> uses two inputs:
>> -- M ("master file") -- very large, presorted and not changing from 
>> run to run,
>> -- D ("details file") -- smaller, different from run  to run, not 
>> necessarily presorted
>> and the output size is proportional to the size of D.
>> In this case the gain from "no-sort" may be much higher, as the 13 
>> "transfer and write" to DFS are applied to a smaller amount of data, 
>> while 11 (b-d) sort-n-shuffle-related are saved on the larger data).
> Could a combiner be used in this hypothetical case?  If so, then the 
> b-d steps might be faster too.
> Doug

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message