hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Binglin Chang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (MAPREDUCE-2841) Task level native optimization
Date Thu, 18 Aug 2011 12:00:28 GMT

     [ https://issues.apache.org/jira/browse/MAPREDUCE-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Binglin Chang updated MAPREDUCE-2841:

    Status: Patch Available  (was: Open)

I just submit a demo patch to show basic ideas and current progress. 
This patch contains map task optimization using NativeMapOutputCollector.

NativeMapOutputCollector use JNI to pass java k/v pairs and 
partition value to native side. As JNI has un-neglectable 
overhead(about 1000ns per empty JNI call wit 4-6 arguments), 
a k/v cache is added to pass k/v pairs to native side in batch. 
I suspect using a DirectBuffer will be better.

On the native side, k/v pairs are put into partitioned buffers,
this is different from java's single buffer approach. By doing so, 
sort can be much faster, because sort a big array is much slower
than sort many small arrays; small array also means less cache 
miss; and partition number does not needed to be compared 
in sort. 

Two light weighted io buffers: ReadBuffer & AppendBuffer, 
which has better performance than decorator based 
java & hadoop io streams. This greatly benefits IFile 
serialization, many method can be inlined. Currently 
compression is not supported yet, but it should not be a 
complex work to add block based compressor like snappy or 

This optimization is ONLY targeted for x86-64, little-endian
and assumes unaligned 64-bit loads and stores are cheap, 
like google-snappy.

About the patch:
I put cpp code into src/c++/libnativetask, and use a Makefile
separate from build.xml and src/native, because many things are 
not decided yet, so I don't want to mess up other components. 
# cd src/c++/libnativetask
# make
# copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/
# set mapred.native.map.output.collector to true in jobconf

Again, this patch is just a demo, it is far from stable & complete, 
and has many known and unknown bugs.

Here is some test results:
1. running single task
   intel corei5 jdk6u24 on macbook pro
     size  250000000 bytes 
     100 bytes per line
     [key 9bytes]\t[value 89bytes]\n
   partition number 100
   io.sort.mb 400MB no mid-spill
   Total time and analysis:
   || ||JAVA||Native||Speedup||
   |IFile SER|2.8|0.9|3.1|

2. Some result about partitioned sort & spill
   Input is the same as test 1
   ||partition |   1  |  10  |  20  |  40  |  80  | 200  | 400  | 800  | 1600 | 2000 |
   ||sort time | 2.21 | 1.47 | 1.27 | 1.11 | 0.9  | 0.71 | 0.65 | 0.58 | 0.51 | 0.5  |
   ||spill time| 0.49 | 0.43 | 0.41 | 0.39 | 0.36 | 0.33 | 0.32 | 0.32 | 0.29 | 0.28 |
   As is illustrated, partition number has great impact on sort & spill performance,
   This is largely because sort complexity and CPU cache effect.
   If partition number is P, recored count is N, we get:
   T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters.

3. Terasort 10G input 40map 40reduce on 9node cluster
   io.sort.mb 500MB
   Results on jobhistory:
   ||        ||   Total ||  AverageMap || AverageShuffle || AverageReduce
   ||java     |   54s   |  14s  |         14s   |        10s |
   ||native   |   39s   |   7s   |        15s    |       9s|
   ||java-lzo |   43s   |  15s      |     8s        |    8s|
   ||native-lzo|  ? | | | |
   speedup: 1.38
   Map output compression can reduce shuffle time significantly,
   so with better compression speed & less shuffle time impact,
   We can get better speedup when map output compression is finished. 
   I don't have large clusters to do more standard tests.

Current progress:
* NativeMapOutputCollector jni [done]
* io buffers [done]
* crc32/crc32c [done]
* integration of jobconfs [done]
* ifile writer [done]
* write ifile index file [done]
* ifile index reader [done]
* ifile reader [done]
* single-pass merge [done]
* handle big key/value
  In current implementation, if key/value length exceeds
  buffer length, there will be crash, I will add big 
  k/v handle soon.
* support block compression snappy
* multi-pass merge
* local io counters
* map-side combine
* parallel spill & merge
  Java has a spill thread to do collect/spill
  concurrently. Currently NativeMapOutputCollelctor only
  use one thread, but parallel sort&spill&merge is possible,
  since k/v pairs are partitioned now. This can further 
  speed up sort&spill&merge by simply adding threads.
* reduce task optimization
* support no sort
* support grouping 
* support pipes API, streaming

> Task level native optimization
> ------------------------------
>                 Key: MAPREDUCE-2841
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2841
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: task
>         Environment: x86-64 Linux
>            Reporter: Binglin Chang
>            Assignee: Binglin Chang
>         Attachments: MAPREDUCE-2841.v1.patch
> I'm recently working on native optimization for MapTask based on JNI. 
> The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by
mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary
test(on Xeon E5410, jdk6u24) showed promising results:
> 1. Sort is about 3x-10x as fast as java(only binary string compare is supported)
> 2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is
used, things can get much faster(1G/s).
> 3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill
> This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper
does nothing) is used.
> There are limitations of course, currently only Text and BytesWritable is supported,
and I have not think through many things right now, such as how to support map side combine.
I had some discussion with somebody familiar with hive, it seems that these limitations won't
be much problem for Hive to benefit from those optimizations, at least. Advices or discussions
about improving compatibility are most welcome:) 
> Currently NativeMapOutputCollector has a static method called canEnable(), which checks
if key/value type, comparator type, combiner are all compatible, then MapTask can choose to
enable NativeMapOutputCollector.
> This is only a preliminary test, more work need to be done. I expect better final results,
and I believe similar optimization can be adopt to reduce task and shuffle too. 

This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira


View raw message