hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Doug Cutting (JIRA)" <j...@apache.org>
Subject [jira] Assigned: (HADOOP-811) Patch to support multi-threaded MapRunnable
Date Fri, 15 Dec 2006 20:17:25 GMT
     [ http://issues.apache.org/jira/browse/HADOOP-811?page=all ]

Doug Cutting reassigned HADOOP-811:
-----------------------------------

    Assignee: Doug Cutting

> Patch to support multi-threaded MapRunnable
> -------------------------------------------
>
>                 Key: HADOOP-811
>                 URL: http://issues.apache.org/jira/browse/HADOOP-811
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>    Affects Versions: 0.10.0
>         Environment: all
>            Reporter: Alejandro Abdelnur
>         Assigned To: Doug Cutting
>         Attachments: diff.txt, diff.txt, MultithreadedMapRunner.java, MultithreadedMapRunner.java
>
>
> The MapRunner calls Mapper.map in a serialized fashion.
> This is suitable for CPU/memory bound operations.
> However, when doing IO bound operations this serialization affects the throughput significantly.
> In order to support IO bound operations more efficiently I've implemented a multithreaded
MapRunnable.
> Following is the implementation of this MapRunnable, MultithreadedMapRunner.
> I've only had to modify on method in the existing code (in the Task class) to avoid data
corruption in the reporter.
> Index: Task.java
> ===================================================================
> --- Task.java   (revision 485492)
> +++ Task.java   (working copy)
> @@ -153,9 +153,11 @@
>    public Reporter getReporter(final TaskUmbilicalProtocol umbilical,
>                                final Progress progress) throws IOException {
>      return new Reporter() {
> -        public void setStatus(String status) throws IOException {
> -          progress.setStatus(status);
> -          progress();
> +        public void setStatus(String status) throws IOException {
> +          synchronized (this) {
> +            progress.setStatus(status);
> +            progress();
> +          }
>          }
>          public void progress() throws IOException {
>              reportProgress(umbilical);
> -----------------------------------------------------------
> MultithreadedMapRunner.java
> package org.apache.hadoop.mapred;
> import org.apache.hadoop.util.ReflectionUtils;
> import org.apache.hadoop.io.WritableComparable;
> import org.apache.hadoop.io.Writable;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import java.io.IOException;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.TimeUnit;
> /**
>  * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
>  * <p>
>  * It can be used instead of the default implementation,
>  * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
>  * bound in order to improve throughput.
>  * <p>
>  * Map implementations using this MapRunnable must be thread-safe.
>  * <p>
>  * The Map-Reduce job has to be configured to use this MapRunnable class (using
>  * the <b>mapred.map.runner.class</b> property) and
>  * the number of thread the thread-pool can use (using the
>  * <b>mapred.map.multithreadedrunner.threads</b> property).
>  * <p>
>  *
>  * @author Alejandro Abdelnur
>  */
> public class MultithreadedMapRunner implements MapRunnable {
>   private static final Log LOG =
>       LogFactory.getLog(MultithreadedMapRunner.class.getName());
>   private JobConf job;
>   private Mapper mapper;
>   private ExecutorService executorService;
>   private IOException ioException;
>   public void configure(JobConf job) {
>     int numberOfThreads =
>         job.getInt("mapred.map.multithreadedrunner.threads", 10);
>     if (LOG.isDebugEnabled()) {
>       LOG.debug("Configuring job " + job.getJobName() +
>           " to use " + numberOfThreads + " threads" );
>     }
>     this.job = job;
>     this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
>                                                       job);
>     // Creating a threadpool of the configured size to execute the Mapper
>     // map method in parallel.
>     executorService = Executors.newFixedThreadPool(numberOfThreads);
>   }
>   public void run(RecordReader input, OutputCollector output,
>                   Reporter reporter)
>     throws IOException {
>     try {
>       // allocate key & value instances these objects will not be reused
>       // because execution of Mapper.map is not serialized.
>       WritableComparable key = input.createKey();
>       Writable value = input.createValue();
>       while (input.next(key, value)) {
>         // Run Mapper.map execution asynchronously in a separate thread.
>         // If threads are not available from the thread-pool this method
>         // will block until there is a thread available.
>         executorService.execute(
>             new MTMapperRunable(key, value, output, reporter));
>         // Checking if a Mapper.map within a Runnable has generated an
>         // IOException. If so we rethrow it to force an abort of the Map
>         // operation thus keeping the semantics of the default
>         // implementation.
>         if (ioException != null) {
>           throw ioException;
>         }
>         // Allocate new key & value instances as mapper is running in parallel
>         key = input.createKey();
>         value = input.createValue();
>       }
>       if (LOG.isDebugEnabled()) {
>         LOG.debug("Finished dispatching all Mappper.map calls, job "
>             + job.getJobName());
>       }
>       // Graceful shutdown of the Threadpool, it will let all scheduled
>       // Runnables to end.
>       executorService.shutdown();
>       try {
>         // Now waiting for all Runnables to end.
>         while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
>           if (LOG.isDebugEnabled()) {
>             LOG.debug("Awaiting all running Mappper.map calls to finish, job "
>                 + job.getJobName());
>           }
>           // Checking if a Mapper.map within a Runnable has generated an
>           // IOException. If so we rethrow it to force an abort of the Map
>           // operation thus keeping the semantics of the default
>           // implementation.
>           // NOTE: while Mapper.map dispatching has concluded there are still
>           // map calls in progress.
>           if (ioException != null) {
>             throw ioException;
>           }
>         }
>         // Checking if a Mapper.map within a Runnable has generated an
>         // IOException. If so we rethrow it to force an abort of the Map
>         // operation thus keeping the semantics of the default
>         // implementation.
>         // NOTE: it could be that a map call has had an exception after the
>         // call for awaitTermination() returing true. And edge case but it
>         // could happen.
>         if (ioException != null) {
>           throw ioException;
>         }
>       }
>       catch (IOException ioEx) {
>         // Forcing a shutdown of all thread of the threadpool and rethrowing
>         // the IOException
>         executorService.shutdownNow();
>         throw ioEx;
>       }
>       catch (InterruptedException iEx) {
>         throw new IOException(iEx.getMessage());
>       }
>     } finally {
>         mapper.close();
>     }
>   }
>   /**
>    * Runnable to execute a single Mapper.map call from a forked thread.
>    */
>   private class MTMapperRunable implements Runnable {
>     private WritableComparable key;
>     private Writable value;
>     private OutputCollector output;
>     private Reporter reporter;
>     /**
>      * Collecting all required parameters to execute a Mapper.map call.
>      * <p>
>      *
>      * @param key
>      * @param value
>      * @param output
>      * @param reporter
>      */
>     public MTMapperRunable(WritableComparable key, Writable value,
>         OutputCollector output, Reporter reporter) {
>       this.key = key;
>       this.value = value;
>       this.output = output;
>       this.reporter = reporter;
>     }
>     /**
>      * Executes a Mapper.map call with the given Mapper and parameters.
>      * <p>
>      * This method is called from the thread-pool thread.
>      *
>      */
>     public void run() {
>       try {
>         // map pair to output
>         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
>       }
>       catch (IOException ex) {
>         // If there is an IOException during the call it is set in an instance
>         // variable of the MultithreadedMapRunner from where it will be
>         // rethrown.
>         synchronized (MultithreadedMapRunner.this) {
>           if (MultithreadedMapRunner.this.ioException == null) {
>             MultithreadedMapRunner.this.ioException = ex;
>           }
>         }
>       }
>     }
>   }
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message