hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mariappan Asokan (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MAPREDUCE-2454) Allow external sorter plugin for MR
Date Thu, 28 Apr 2011 23:12:04 GMT

    [ https://issues.apache.org/jira/browse/MAPREDUCE-2454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13026766#comment-13026766

Mariappan Asokan commented on MAPREDUCE-2454:

Hi Owen,
  Thank you for your comments.  Here are my thoughts.

On the Map side, the external sorter would also need the partition number, notjust the key
and value.  I am not sure how RecordWriter can be used.  In the current MapTask, the sorting
starts in MapOutputBuffer which implements
MapOutputCollector.  I thought it is natural for an external sorter to extendMapOutputCollector
interface as well.  Perhaps, following Steve's suggestion we can rewrite MapOutputCollector

public interface MapOutputCollector<K, V> extends Closeable {
  public void collect(K key, V value, int partition)
    throws IOException, InterruptedException;
  public void flush() throws IOException, InterruptedException;

At present, "extends Closeable" is missing.

A digression
If you treat the framework's sorter as a black box, it accepts a set of key and value pairs
but produces raw key and value pairs.  There is an asymmetry.

An external sorter may not produce a RawKeyValueIterator(due to its own serialization mechanism
- for example if records are piped to GNU sort, the key and value may be serialized with a
TAB separator between them.)  If an
external sorter would like to use CombinerRunner classes defined in Task.java, it cannot do
so without incurring an additional data move.  I was looking for an iterator that will return
simple key and values.  I could not find any that is efficient.  The RecordReader looks appropriate
functionally, but is not efficient when passed to ValuesIterator(defined in Task.java) which
gets used as part of running a Combiner.  The key and values returned from RecordReader will
have to be copied.  Any kind of such data move will affect the performance especially when
dealing with huge volume of data.

I had to come up with a simple key, value iterator as below:

public interface KeyValueIterator<K, V> extends Closeable {
   * Get the current key.
   * @param key where the current key should be stored.  If this is null, a new
   * instance will be created and returned.
   * @param key current key
  K getCurrentKey(K key) throws IOException, InterruptedException;
   * Get the current value.
   * @param value where the current value should be stored.  If this is null, a
   * new instance will be created and returned.
   * @return value current value.
  V getCurrentValue(V value) throws IOException, InterruptedException;
   * Set up to get the current key and value (for getKey() and getValue()).
   * @return <code>true</code> if there exists a key/value, <code>false</code>
   * otherwise. 
  boolean nextKeyValue() throws IOException, InterruptedException;
   * Get the Progress object; this has a float (0.0 - 1.0) indicating the bytes
   * processed by the iterator so far.
   * @return progress object.
  Progress getProgress();

I was able to wrap the framework's RawKeyValueIterator inside an implementation of KeyValueIterator
without additional data move.  This makes sure that anything outside the sorter sees only
the simple key, value iterator.  The serialized representation stays internal to the sorter
black box.  The external sorter is also happy as it does not incur any extra data move:-)

The abstract base class and MAPREDUCE-279
As I mentioned in my previous post, I created an abstract base class called MapOutputSorterAbstract(I
am attaching the source to Jira-2454)  in order to access package protected class methods.
 I would appreciate if developers
familiar with MAPREDUCE-279 can take a look at the class and comment on whether the class
can live completely outside the framework.  In MapTask.java, I needed to change the access
of APPROX_HEADER_LENGTH to package public from private.

My specific questions are:

If TaskReporter, Counter, MapOutputFile be accessible as public classes, how can they be passed
to external sorter from MapTask.java?

Will CombinerRunner as defined in Task.java be available?(I had to change the access to public.)

The classes SpillRecord and IndexRecord should also be made public.  Since IndexRecord is
not an inner class of SpillRecord, I created another file IndexRecord.java and moved the code

On the Reduce side, I was trying to come up with an interface that can be implemented by both
the framework as well as an external sorter.  It was not easy to decouple shuffling and Merger
since the shuffle is driving the Merge not
the other way around.  Since I wanted to reuse the framework's shuffle code, I ended up using
a few ugly if's so that data is shuffled either to the framework's Merger or to an external

If the shuffle code can somehow be invoked from outside the core packages using public interfaces,
the external sorter on the Reduce side can just implement a simple key, value iterator.  I
think this might require some inversion of control and code rewrite in some sensitive areas.

On ServiceLoader:
I thought of taking a simple approach: Users can configure a job with the name of the external
sorter classes in configuration parameters like mapred.map.externalsort.class and mapred.reduce.externalsort.class
and use
simple Java class loader to load the class.  This is very similar to configuring a Mapper
class for example.  Am my missing something?  Is there a strong reason to use ServiceLoader?

Please give me your feedback.

If developers do not get the full picture of what I was playing with, I can try to make my
changes locally on top of MAPREDUCE-279 branch and post a patch file.

Thanks everyone for your patience.

-- Asokan

> Allow external sorter plugin for MR
> -----------------------------------
>                 Key: MAPREDUCE-2454
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2454
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>            Reporter: Mariappan Asokan
>            Priority: Minor
>         Attachments: MapOutputSorter.java, MapOutputSorterAbstract.java, ReduceInputSorter.java
> Define interfaces and some abstract classes in the Hadoop framework to facilitate external
sorter plugins both on the Map and Reduce sides.

This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

View raw message