hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alejandro Abdelnur (JIRA)" <j...@apache.org>
Subject [jira] Created: (HADOOP-811) Patch to support multi-threaded MapRunnable
Date Mon, 11 Dec 2006 15:47:21 GMT
Patch to support multi-threaded MapRunnable
-------------------------------------------

                 Key: HADOOP-811
                 URL: http://issues.apache.org/jira/browse/HADOOP-811
             Project: Hadoop
          Issue Type: New Feature
          Components: mapred
    Affects Versions: 0.9.1
         Environment: all
            Reporter: Alejandro Abdelnur


The MapRunner calls Mapper.map in a serialized fashion.

This is suitable for CPU/memory bound operations.

However, when doing IO bound operations this serialization affects the throughput significantly.

In order to support IO bound operations more efficiently I've implemented a multithreaded
MapRunnable.

Following is the implementation of this MapRunnable, MultithreadedMapRunner.

I've only had to modify on method in the existing code (in the Task class) to avoid data corruption
in the reporter.

Index: Task.java
===================================================================
--- Task.java   (revision 485492)
+++ Task.java   (working copy)
@@ -153,9 +153,11 @@
   public Reporter getReporter(final TaskUmbilicalProtocol umbilical,
                               final Progress progress) throws IOException {
     return new Reporter() {
-        public void setStatus(String status) throws IOException {
-          progress.setStatus(status);
-          progress();
+        public void setStatus(String status) throws IOException {
+          synchronized (this) {
+            progress.setStatus(status);
+            progress();
+          }
         }
         public void progress() throws IOException {
             reportProgress(umbilical);

-----------------------------------------------------------
MultithreadedMapRunner.java

package org.apache.hadoop.mapred;

import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
 * <p>
 * It can be used instead of the default implementation,
 * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
 * bound in order to improve throughput.
 * <p>
 * Map implementations using this MapRunnable must be thread-safe.
 * <p>
 * The Map-Reduce job has to be configured to use this MapRunnable class (using
 * the <b>mapred.map.runner.class</b> property) and
 * the number of thread the thread-pool can use (using the
 * <b>mapred.map.multithreadedrunner.threads</b> property).
 * <p>
 *
 * @author Alejandro Abdelnur
 */
public class MultithreadedMapRunner implements MapRunnable {
  private static final Log LOG =
      LogFactory.getLog(MultithreadedMapRunner.class.getName());

  private JobConf job;
  private Mapper mapper;
  private ExecutorService executorService;
  private IOException ioException;

  public void configure(JobConf job) {
    int numberOfThreads =
        job.getInt("mapred.map.multithreadedrunner.threads", 10);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Configuring job " + job.getJobName() +
          " to use " + numberOfThreads + " threads" );
    }

    this.job = job;
    this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
                                                      job);

    // Creating a threadpool of the configured size to execute the Mapper
    // map method in parallel.
    executorService = Executors.newFixedThreadPool(numberOfThreads);
  }

  public void run(RecordReader input, OutputCollector output,
                  Reporter reporter)
    throws IOException {
    try {
      // allocate key & value instances these objects will not be reused
      // because execution of Mapper.map is not serialized.
      WritableComparable key = input.createKey();
      Writable value = input.createValue();

      while (input.next(key, value)) {

        // Run Mapper.map execution asynchronously in a separate thread.
        // If threads are not available from the thread-pool this method
        // will block until there is a thread available.
        executorService.execute(
            new MTMapperRunable(key, value, output, reporter));

        // Checking if a Mapper.map within a Runnable has generated an
        // IOException. If so we rethrow it to force an abort of the Map
        // operation thus keeping the semantics of the default
        // implementation.
        if (ioException != null) {
          throw ioException;
        }

        // Allocate new key & value instances as mapper is running in parallel
        key = input.createKey();
        value = input.createValue();
      }

      if (LOG.isDebugEnabled()) {
        LOG.debug("Finished dispatching all Mappper.map calls, job "
            + job.getJobName());
      }

      // Graceful shutdown of the Threadpool, it will let all scheduled
      // Runnables to end.
      executorService.shutdown();

      try {

        // Now waiting for all Runnables to end.
        while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Awaiting all running Mappper.map calls to finish, job "
                + job.getJobName());
          }

          // Checking if a Mapper.map within a Runnable has generated an
          // IOException. If so we rethrow it to force an abort of the Map
          // operation thus keeping the semantics of the default
          // implementation.
          // NOTE: while Mapper.map dispatching has concluded there are still
          // map calls in progress.
          if (ioException != null) {
            throw ioException;
          }
        }

        // Checking if a Mapper.map within a Runnable has generated an
        // IOException. If so we rethrow it to force an abort of the Map
        // operation thus keeping the semantics of the default
        // implementation.
        // NOTE: it could be that a map call has had an exception after the
        // call for awaitTermination() returing true. And edge case but it
        // could happen.
        if (ioException != null) {
          throw ioException;
        }
      }
      catch (IOException ioEx) {
        // Forcing a shutdown of all thread of the threadpool and rethrowing
        // the IOException
        executorService.shutdownNow();
        throw ioEx;
      }
      catch (InterruptedException iEx) {
        throw new IOException(iEx.getMessage());
      }

    } finally {
        mapper.close();
    }
  }


  /**
   * Runnable to execute a single Mapper.map call from a forked thread.
   */
  private class MTMapperRunable implements Runnable {
    private WritableComparable key;
    private Writable value;
    private OutputCollector output;
    private Reporter reporter;

    /**
     * Collecting all required parameters to execute a Mapper.map call.
     * <p>
     *
     * @param key
     * @param value
     * @param output
     * @param reporter
     */
    public MTMapperRunable(WritableComparable key, Writable value,
        OutputCollector output, Reporter reporter) {
      this.key = key;
      this.value = value;
      this.output = output;
      this.reporter = reporter;
    }

    /**
     * Executes a Mapper.map call with the given Mapper and parameters.
     * <p>
     * This method is called from the thread-pool thread.
     *
     */
    public void run() {
      try {
        // map pair to output
        MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
      }
      catch (IOException ex) {
        // If there is an IOException during the call it is set in an instance
        // variable of the MultithreadedMapRunner from where it will be
        // rethrown.
        synchronized (MultithreadedMapRunner.this) {
          if (MultithreadedMapRunner.this.ioException == null) {
            MultithreadedMapRunner.this.ioException = ex;
          }
        }
      }
    }
  }

}




-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message