hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mariappan Asokan (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MAPREDUCE-2454) Allow external sorter plugin for MR
Date Sun, 01 May 2011 00:51:05 GMT

    [ https://issues.apache.org/jira/browse/MAPREDUCE-2454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13027411#comment-13027411
] 

Mariappan Asokan commented on MAPREDUCE-2454:
---------------------------------------------

I thought more on the implementation.  Here is what I came up with the steps
involved.  I can create one Jira per each step. If there is any objection from
anyone, I would like to hear about it before I jump in.  In the following, any
reference to 'framework code' implies current code in Map/Reduce in Hadoop
taken at the branch MAPREDUCE-279.
# Modify the framework code so that RawKeyValueIterator is visible only within
the sort/merge related code.  All others will see the new KeyValueIterator as
defined below:
{code:title=KeyValueIterator.java|borderStyle=solid}
public interface KeyValueIterator<K, V> extends Closeable {
  /**
   * Get the current key.
   * @param key where the current key should be stored.  If this is null, a new
   * instance will be created and returned.
   * @return current key
   * @exception IOException in case of error.
   * @exception InterruptedException in case of interruption.
   */
  K getCurrentKey(K key) throws IOException, InterruptedException;
  
  /**
   * Get the current key.
   * @return current key
   * @exception IOException in case of error.
   * @exception InterruptedException in case of interruption.
   */
  K getCurrentKey() throws IOException, InterruptedException;
  
  /**
   * Get the current value.
   * @param value where the current value should be stored.  If this is null, a
   * new instance will be created and returned.
   * @return current value.
   * @exception IOException in case of error.
   * @exception InterruptedException in case of interruption.
   */
  V getCurrentValue(V value) throws IOException, InterruptedException;
  
  /**
   * Get the current value.
   * @return current value.
   * @exception IOException in case of error.
   * @exception InterruptedException in case of interruption.
   */
  V getCurrentValue() throws IOException, InterruptedException;
  
  /**
   * Set up to get the current key and value (for getKey() and getValue()).
   * @return <code>true</code> if there exists a key/value, <code>false</code>
   * otherwise. 
   * @throws IOException in case of error.
   * @exception InterruptedException in case of interruption.
   */
  boolean nextKeyValue() throws IOException, InterruptedException;
  
  /**
   * Get the Progress object; this has a float (0.0 - 1.0) indicating the bytes
   * processed by the iterator so far.
   * @return progress object.
   */
  Progress getProgress();
}
{code}
This will enable any external sorter implementation to reuse existing code in
Task.java to run the combiner and the code in ReduceTask.java to run the
reducer.
# Modify the framework code so that an external sorter can be plugged on the
Map side.
# Modify the shuffle code on the Reduce side so that a shuffle can be started
by any code outside MR(perhaps in a separate thread.)  A callback interface
will be passed to shuffle.  Tentatively, the callback will look like this and
can be refined.
{code:title=ShuffleCallback.java|borderStyle=solid}
public interface ShuffleCallback<K, V> extends Closeable
  /**
   * To reserve space for the specified mapper output.
   * @param mapId mapper id.
   * @param requestedSize number of bytes in space to be reserved.
   * @param fetcher id of fetcher that will fetch the map output.
   * @exception IOException in case of error.
   */
  public MapOutput<K,V> reserve(TaskAttemptID mapId, long requestedSize,
                                int fetcher)
    throws IOException;

  /**
   * To shuffle the data from local mappers.
   * @param localMapFiles array of map output files to be sorted.
   * @return total number of bytes read from the mapper outputs.
   * @exception IOException if there is any IO error while reading.
   * @exception InterruptedException if there is an interruption.
   * @exception InterruptedException in case of interruption.
   * @exception ReduceInputSorterException any other exception that occurs while
   * sorting.
   */
  public long shuffle(Path localMapFiles[])
    throws IOException, InterruptedException, ReduceInputSorterException;

  /**
   * To shuffle the raw data coming from a non-local mapper.  Multiple threads
   * can call this method with input from different mappers.
   * @param inputFromMapper the raw input stream from the mapper. If map output
   * is compressed, the sorter is responsible for decompressing.
   * @param mapTaskId map task id corresponding to the stream.
   * @param compressedLength The size of the compressed data.
   * @return number of bytes read from the mapper stream.
   * @exception IOException if there is any IO error while reading.
   * @exception InterruptedException if there is an interruption.
   * @exception ReduceInputSorterException any other exception that occurs in
   * the sorter.
   */
  public long shuffle(InputStream inputFromMapper, String mapTaskId,
                      long compressedLength)
    throws IOException, InterruptedException, ReduceInputSorterException;

  /**
   * To commit shuffled data from a non-local mapper.  Usually, this method is
   * called right after shuffle() from the same thread once it is
   * decided to commit.
   * @param mapTaskId map task id corresponding to the shuffled data.
   * @exception IOException if there is any IO error while discarding.
   * @exception InterruptedException in case of interruption.
   * @exception ReduceInputSorterException any other exception that occurs in
   * the sorter.
   */
  public void commit(String mapTaskId)
    throws IOException, InterruptedException, ReduceInputSorterException;

  /**
   * To discard shuffled data from a non-local mapper. Usually, this method is
   * called right after shuffle() from the same thread once it is
   * decided to discard.
   * @param mapTaskId map task id corresponding to the shuffled data.
   * @exception IOException if there is any IO error while discarding.
   * @exception InterruptedException in case of interruption.
   * @exception ReduceInputSorterException any other exception that occurs in
   * the sorter.
   */
  public void discard(String mapTaskId)
    throws IOException, InterruptedException, ReduceInputSorterException;

  /**
   * To indicate end of input from all non-local mappers.  This should be called
   * after all non-local mapper outputs are committed.
   * @exception IOException if there is any IO error.
   * @exception InterruptedException if there is an interruption.
   * @exception ReduceInputSorterException any other exception that occurs in
   * the sorter.
   */
  public void close()
    throws IOException, InterruptedException, ReduceInputSorterException;
{code}
# Modify ReduceTask.java so that it will invoke the framework's merge or the
external sorter.
# The external sorter interface on the Map side will look like:
{code:title=MapOutputSorter.java|borderStyle=solid}
public interface MapOutputSorter<K, V> 
  extends MapOutputCollector<K, V> {
  /**
   * To initialize the sorter.
   * @param job job configuration.
   * @param mapTask map task invoking this sorter.
   * @param inputSplitSize size of input split processed by this sorter.
   * @param mapOutputFile map output file
   * @param reporter reporter to report sorter progress.
   * @exception IOException if there is any error during initialization.
   * @exception ClassNotFoundException if a class to be loaded is not found.
   * @exception UnsupportedOperationException thrown by the sorter if it
   * cannot support certain options in the job.  For example, a sorter may
   * support only a certain subset of key types.  The default sorter in the
   * framework will be used as a fallback when this exception is thrown.
   */
  public void initialize(JobConf job, MapTask mapTask, long inputSplitSize,
                         MapOutputFile mapOutputFile, TaskReporter reporter)
    throws IOException, ClassNotFoundException, UnsupportedOperationException;
}
{code}
The MapOutputCollector interface defined in MapTask.java will be made public
and will look like below:
{code:title=MapOutputCollector.java|borderStyle=solid}
public interface MapOutputCollector<K, V> extends Closeable{
  public void collect(K key, V value, int partition
                      ) throws IOException, InterruptedException;
  public void flush() throws IOException, InterruptedException, 
                             ClassNotFoundException;
  public void close() throws IOException, InterruptedException;
}
{code}
On the Reduce side, the external sorter interface will look like:
{code:title=ReduceInputSorter.java|borderStyle=solid}
public interface ReduceInputSorter<K, V> extends KeyValueIterator<K, V> {
  /**
   * Initialize the sorter.
   * @param job job configuration.
   * @param reduceTask reduce task invoking this sorter.
   * @param reporter reporter to report sorter progress.
   * @exception IOException if there is any error during initialization.
   * @exception ClassNotFoundException if a class to be loaded is not found.
   * @exception UnsupportedOperationException thrown by the sorter if it
   * cannot support certain options in the job.  For example, a sorter may
   * support only a certain subset of key types.  The default sorter in the
   * framework will be used as a fallback when this exception is thrown.
   */
  public void initialize(JobConf job, ReduceTask reduceTask,
                         TaskReporter reporter)
    throws IOException, ClassNotFoundException, UnsupportedOperationException;
{code}
# Some abstract base classes of the above two may be provided in the framework
to facilitate external sorter implementations.
# Provide a proof-of-concept implementation of an external sorter both on the
Map and Reduce sides using GNU sort command as the external sorter.
*All the changes mentioned above should not result in any performance
degradation of framework code when no external sorter is plugged in.*



> Allow external sorter plugin for MR
> -----------------------------------
>
>                 Key: MAPREDUCE-2454
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2454
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>            Reporter: Mariappan Asokan
>            Priority: Minor
>         Attachments: KeyValueIterator.java, MapOutputSorter.java, MapOutputSorterAbstract.java,
ReduceInputSorter.java
>
>
> Define interfaces and some abstract classes in the Hadoop framework to facilitate external
sorter plugins both on the Map and Reduce sides.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message