hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r904706 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/LocalJobRunner.java src/java/org/apache/hadoop/mapreduce/Cluster.java src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
Date Sat, 30 Jan 2010 01:22:54 GMT
Author: tomwhite
Date: Sat Jan 30 01:22:54 2010
New Revision: 904706

URL: http://svn.apache.org/viewvc?rev=904706&view=rev
Log:
MAPREDUCE-1367. LocalJobRunner should support parallel mapper execution. Contributed by Aaron
Kimball.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=904706&r1=904705&r2=904706&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Jan 30 01:22:54 2010
@@ -142,6 +142,9 @@
     MAPREDUCE-1356. Allow user-specified hive table name in sqoop.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-1367. LocalJobRunner should support parallel mapper execution.
+    (Aaron Kimball via tomwhite)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=904706&r1=904705&r2=904706&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Sat Jan 30
01:22:54 2010
@@ -22,9 +22,15 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,17 +61,23 @@
   public static final Log LOG =
     LogFactory.getLog(LocalJobRunner.class);
 
+  /** The maximum number of map tasks to run in parallel in LocalJobRunner */
+  public static final String LOCAL_MAX_MAPS =
+    "mapreduce.local.map.tasks.maximum";
+
   private FileSystem fs;
   private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
   private JobConf conf;
-  private int map_tasks = 0;
+  private AtomicInteger map_tasks = new AtomicInteger(0);
   private int reduce_tasks = 0;
   final Random rand = new Random();
   
   private JobTrackerInstrumentation myMetrics = null;
 
   private static final String jobDir =  "localRunner/";
-  
+
+  private static final Counters EMPTY_COUNTERS = new Counters();
+
   public long getProtocolVersion(String protocol, long clientVersion) {
     return ClientProtocol.versionID;
   }
@@ -83,8 +95,14 @@
     private JobID id;
     private JobConf job;
 
+    private int numMapTasks;
+    private float [] partialMapProgress;
+    private Counters [] mapCounters;
+    private Counters reduceCounters;
+
     private JobStatus status;
-    private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
+    private List<TaskAttemptID> mapIds = Collections.synchronizedList(
+        new ArrayList<TaskAttemptID>());
 
     private JobProfile profile;
     private FileSystem localFs;
@@ -92,13 +110,6 @@
     
     private TrackerDistributedCacheManager trackerDistributerdCacheManager;
     private TaskDistributedCacheManager taskDistributedCacheManager;
-    
-    // Counters summed over all the map/reduce tasks which
-    // have successfully completed
-    private Counters completedTaskCounters = new Counters();
-    
-    // Current counters, including incomplete task(s)
-    private Counters currentCounters = new Counters();
 
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
@@ -147,7 +158,7 @@
         out.close();
       }
       this.job = new JobConf(localJobFile);
-      
+
       // Job (the current object) is a Thread, so we wrap its class loader.
       if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
         setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
@@ -168,18 +179,147 @@
     JobProfile getProfile() {
       return profile;
     }
-    
+
+    /**
+     * A Runnable instance that handles a map task to be run by an executor.
+     */
+    protected class MapTaskRunnable implements Runnable {
+      private final int taskId;
+      private final TaskSplitMetaInfo info;
+      private final JobID jobId;
+      private final JobConf localConf;
+
+      // This is a reference to a shared object passed in by the
+      // external context; this delivers state to the reducers regarding
+      // where to fetch mapper outputs.
+      private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
+
+      public volatile Throwable storedException;
+
+      public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
+          Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+        this.info = info;
+        this.taskId = taskId;
+        this.mapOutputFiles = mapOutputFiles;
+        this.jobId = jobId;
+        this.localConf = new JobConf(job);
+      }
+
+      public void run() {
+        try {
+          TaskAttemptID mapId = new TaskAttemptID(new TaskID(
+              jobId, TaskType.MAP, taskId), 0);
+          LOG.info("Starting task: " + mapId);
+          mapIds.add(mapId);
+          MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
+            info.getSplitIndex(), 1);
+          map.setUser(UserGroupInformation.getCurrentUser().
+              getShortUserName());
+          TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+          MapOutputFile mapOutput = new MapOutputFile();
+          mapOutput.setConf(localConf);
+          mapOutputFiles.put(mapId, mapOutput);
+
+          map.setJobFile(localJobFile.toString());
+          localConf.setUser(map.getUser());
+          map.localizeConfiguration(localConf);
+          map.setConf(localConf);
+          try {
+            map_tasks.getAndIncrement();
+            myMetrics.launchMap(mapId);
+            map.run(localConf, Job.this);
+            myMetrics.completeMap(mapId);
+          } finally {
+            map_tasks.getAndDecrement();
+          }
+
+          LOG.info("Finishing task: " + mapId);
+        } catch (Throwable e) {
+          this.storedException = e;
+        }
+      }
+    }
+
+    /**
+     * Create Runnables to encapsulate map tasks for use by the executor
+     * service.
+     * @param taskInfo Info about the map task splits
+     * @param jobId the job id
+     * @param mapOutputFiles a mapping from task attempts to output files
+     * @return a List of Runnables, one per map task.
+     */
+    protected List<MapTaskRunnable> getMapTaskRunnables(
+        TaskSplitMetaInfo [] taskInfo, JobID jobId,
+        Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+
+      int numTasks = 0;
+      ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+      for (TaskSplitMetaInfo task : taskInfo) {
+        list.add(new MapTaskRunnable(task, numTasks++, jobId,
+            mapOutputFiles));
+      }
+
+      return list;
+    }
+
+    /**
+     * Initialize the counters that will hold partial-progress from
+     * the various task attempts.
+     * @param numMaps the number of map tasks in this job.
+     */
+    private synchronized void initCounters(int numMaps) {
+      // Initialize state trackers for all map tasks.
+      this.partialMapProgress = new float[numMaps];
+      this.mapCounters = new Counters[numMaps];
+      for (int i = 0; i < numMaps; i++) {
+        this.mapCounters[i] = EMPTY_COUNTERS;
+      }
+
+      this.reduceCounters = EMPTY_COUNTERS;
+    }
+
+    /**
+     * Creates the executor service used to run map tasks.
+     *
+     * @param numMapTasks the total number of map tasks to be run
+     * @return an ExecutorService instance that handles map tasks
+     */
+    protected ExecutorService createMapExecutor(int numMapTasks) {
+
+      // Determine the size of the thread pool to use
+      int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
+      if (maxMapThreads < 1) {
+        throw new IllegalArgumentException(
+            "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
+      }
+      this.numMapTasks = numMapTasks;
+      maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
+      maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
+
+      initCounters(this.numMapTasks);
+
+      LOG.debug("Starting thread pool executor.");
+      LOG.debug("Max local threads: " + maxMapThreads);
+      LOG.debug("Map tasks to process: " + this.numMapTasks);
+
+      // Create a new executor service to drain the work queue.
+      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
+
+      return executor;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void run() {
       JobID jobId = profile.getJobID();
       JobContext jContext = new JobContextImpl(job, jobId);
       OutputCommitter outputCommitter = job.getOutputCommitter();
+
       try {
         TaskSplitMetaInfo[] taskSplitMetaInfos = 
           SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
-        
-    
+
         int numReduceTasks = job.getNumReduceTasks();
         if (numReduceTasks > 1 || numReduceTasks < 0) {
           // we only allow 0 or 1 reducer in local mode
@@ -190,39 +330,42 @@
         status.setSetupProgress(1.0f);
 
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
-            new HashMap<TaskAttemptID, MapOutputFile>();
-        for (int i = 0; i < taskSplitMetaInfos.length; i++) {
-          if (!this.isInterrupted()) {
-            TaskAttemptID mapId = new TaskAttemptID(
-                new TaskID(jobId, TaskType.MAP, i),0);  
-            mapIds.add(mapId);
-            MapTask map = new MapTask(systemJobFile.toString(),  
-                                      mapId, i,
-                                      taskSplitMetaInfos[i].getSplitIndex(), 1);
-            map.setUser(UserGroupInformation.getCurrentUser().
-                getShortUserName());
-            JobConf localConf = new JobConf(job);
-            TaskRunner.setupChildMapredLocalDirs(map, localConf);
+            Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
 
-            MapOutputFile mapOutput = new MapOutputFile();
-            mapOutput.setConf(localConf);
-            mapOutputFiles.put(mapId, mapOutput);
-
-            map.setJobFile(localJobFile.toString());
-            localConf.setUser(map.getUser());
-            map.localizeConfiguration(localConf);
-            map.setConf(localConf);
-            map_tasks += 1;
-            myMetrics.launchMap(mapId);
-            map.run(localConf, this);
-            myMetrics.completeMap(mapId);
-            map_tasks -= 1;
-            updateCounters(map);
-          } else {
-            throw new InterruptedException();
+        List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
+            jobId, mapOutputFiles);
+        ExecutorService mapService = createMapExecutor(taskRunnables.size());
+
+        // Start populating the executor with work units.
+        // They may begin running immediately (in other threads).
+        for (Runnable r : taskRunnables) {
+          mapService.submit(r);
+        }
+
+        try {
+          mapService.shutdown(); // Instructs queue to drain.
+
+          // Wait for tasks to finish; do not use a time-based timeout.
+          // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+          LOG.info("Waiting for map tasks");
+          mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException ie) {
+          // Cancel all threads.
+          mapService.shutdownNow();
+          throw ie;
+        }
+
+        LOG.info("Map task executor complete.");
+
+        // After waiting for the map tasks to complete, if any of these
+        // have thrown an exception, rethrow it now in the main thread context.
+        for (MapTaskRunnable r : taskRunnables) {
+          if (r.storedException != null) {
+            throw new Exception(r.storedException);
           }
         }
-        TaskAttemptID reduceId = 
+
+        TaskAttemptID reduceId =
           new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
         try {
           if (numReduceTasks > 0) {
@@ -231,6 +374,7 @@
             reduce.setUser(UserGroupInformation.getCurrentUser().
                 getShortUserName());
             JobConf localConf = new JobConf(job);
+            localConf.set("mapreduce.jobtracker.address", "local");
             TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
@@ -262,7 +406,6 @@
               reduce.run(localConf, this);
               myMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
-              updateCounters(reduce);
             } else {
               throw new InterruptedException();
             }
@@ -318,23 +461,47 @@
 
     public JvmTask getTask(JvmContext context) { return null; }
     
-    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
-    throws IOException, InterruptedException {
+    public synchronized boolean statusUpdate(TaskAttemptID taskId,
+        TaskStatus taskStatus) throws IOException, InterruptedException {
       LOG.info(taskStatus.getStateString());
-      float taskIndex = mapIds.indexOf(taskId);
+      int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
-        float numTasks = mapIds.size();
-        status.setMapProgress(taskIndex/numTasks + taskStatus.getProgress()/numTasks);
+        float numTasks = (float) this.numMapTasks;
+
+        partialMapProgress[taskIndex] = taskStatus.getProgress();
+        mapCounters[taskIndex] = taskStatus.getCounters();
+
+        float partialProgress = 0.0f;
+        for (float f : partialMapProgress) {
+          partialProgress += f;
+        }
+        status.setMapProgress(partialProgress / numTasks);
       } else {
+        reduceCounters = taskStatus.getCounters();
         status.setReduceProgress(taskStatus.getProgress());
       }
-      currentCounters = Counters.sum(completedTaskCounters, taskStatus.getCounters());
-      
+
       // ignore phase
-      
       return true;
     }
 
+    /** Return the current values of the counters for this job,
+     * including tasks that are in progress.
+     */
+    public synchronized Counters getCurrentCounters() {
+      if (null == mapCounters) {
+        // Counters not yet initialized for job.
+        return EMPTY_COUNTERS;
+      }
+
+      Counters current = EMPTY_COUNTERS;
+      for (Counters c : mapCounters) {
+        current = Counters.sum(current, c);
+      }
+      current = Counters.sum(current, reduceCounters);
+      return current;
+    }
+
     /**
      * Task is reporting that it is in commit_pending
      * and it is waiting for the commit Response
@@ -345,15 +512,6 @@
       statusUpdate(taskid, taskStatus);
     }
 
-    /**
-     * Updates counters corresponding to completed tasks.
-     * @param task A map or reduce task which has just been 
-     * successfully completed
-     */ 
-    private void updateCounters(Task task) {
-      completedTaskCounters.incrAllCounters(task.getCounters());
-    }
-
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
       // Ignore for now
     }
@@ -463,7 +621,8 @@
   public org.apache.hadoop.mapreduce.Counters getJobCounters(
       org.apache.hadoop.mapreduce.JobID id) {
     Job job = jobs.get(JobID.downgrade(id));
-    return new org.apache.hadoop.mapreduce.Counters(job.currentCounters);
+
+    return new org.apache.hadoop.mapreduce.Counters(job.getCurrentCounters());
   }
 
   public String getFilesystemName() throws IOException {
@@ -471,8 +630,9 @@
   }
   
   public ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(map_tasks, reduce_tasks, map_tasks, reduce_tasks,
-      0, 0, 1, 1, jobs.size(), 1, 0, 0);
+    int numMapTasks = map_tasks.get();
+    return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
+        reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
   }
 
   public State getJobTrackerState() throws IOException, InterruptedException {
@@ -574,4 +734,24 @@
       getQueueAclsForCurrentUser() throws IOException{
     return null;
   }
+
+  /**
+   * Set the max number of map tasks to run concurrently in the LocalJobRunner.
+   * @param job the job to configure
+   * @param maxMaps the maximum number of map tasks to allow.
+   */
+  public static void setLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job,
+      int maxMaps) {
+    job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
+  }
+
+  /**
+   * @return the max number of map tasks to run concurrently in the
+   * LocalJobRunner.
+   */
+  public static int getLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job) {
+    return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=904706&r1=904705&r2=904706&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java Sat Jan 30 01:22:54
2010
@@ -76,7 +76,7 @@
 
   private ClientProtocol createClient(Configuration conf) throws IOException {
     ClientProtocol client;
-    String tracker = conf.get("mapred.job.tracker", "local");
+    String tracker = conf.get("mapreduce.jobtracker.address", "local");
     if ("local".equals(tracker)) {
       conf.setInt("mapreduce.job.maps", 1);
       client = new LocalJobRunner(conf);

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=904706&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
Sat Jan 30 01:22:54 2010
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+
+/**
+ * Stress tests for the LocalJobRunner
+ */
+public class TestLocalRunner extends TestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestLocalRunner.class);
+
+  private static class StressMapper
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    // Different map tasks operate at different speeds.
+    // We define behavior for 6 threads.
+    private int threadId;
+
+    // Used to ensure that the compiler doesn't optimize away
+    // some code.
+    public long exposedState;
+
+    protected void setup(Context context) {
+      // Get the thread num from the file number.
+      FileSplit split = (FileSplit) context.getInputSplit();
+      Path filePath = split.getPath();
+      String name = filePath.getName();
+      this.threadId = Integer.valueOf(name);
+
+      LOG.info("Thread " + threadId + " : "
+          + context.getInputSplit());
+    }
+
+    /** Map method with different behavior based on the thread id */
+    public void map(LongWritable key, Text val, Context c)
+        throws IOException, InterruptedException {
+
+      switch(threadId) {
+      case 0:
+        // Write a single value and be done.
+        c.write(new LongWritable(0), val);
+        break;
+      case 1:
+      case 2:
+        // Write many values quickly.
+        for (int i = 0; i < 500; i++) {
+          c.write(new LongWritable(0), val);
+        }
+        break;
+      case 3:
+        // Write many values, using thread sleeps to delay this.
+        for (int i = 0; i < 50; i++) {
+          for (int j = 0; j < 10; j++) {
+            c.write(new LongWritable(0), val);
+          }
+          Thread.sleep(1);
+        }
+        break;
+      case 4:
+        // Write many values, using busy-loops to delay this.
+        for (int i = 0; i < 500; i++) {
+          for (int j = 0; j < 10000; j++) {
+            this.exposedState++;
+          }
+          c.write(new LongWritable(0), val);
+        }
+        break;
+      case 5:
+        // Write many values, using very slow busy-loops to delay this.
+        for (int i = 0; i < 500; i++) {
+          for (int j = 0; j < 100000; j++) {
+            this.exposedState++;
+          }
+          c.write(new LongWritable(0), val);
+        }
+        break;
+      default:
+        // Write a single value and be done.
+        c.write(new LongWritable(0), val);
+        break;
+      }
+    }
+
+    protected void cleanup(Context context) {
+      // Output this here, to ensure that the incrementing done in map()
+      // cannot be optimized away.
+      LOG.debug("Busy loop counter: " + this.exposedState);
+    }
+  }
+
+  private static class CountingReducer
+      extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
+
+    public void reduce(LongWritable key, Iterable<Text> vals, Context context)
+        throws IOException, InterruptedException {
+      long out = 0;
+      for (Text val : vals) {
+        out++;
+      }
+
+      context.write(key, new LongWritable(out));
+    }
+  }
+
+  /**
+   * Create a single input file in the input directory.
+   * @param dirPath the directory in which the file resides
+   * @param id the file id number
+   * @param numRecords how many records to write to each file.
+   */
+  private void createInputFile(Path dirPath, int id, int numRecords)
+      throws IOException {
+    final String MESSAGE = "This is a line in a file: ";
+
+    Path filePath = new Path(dirPath, "" + id);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+    for (int i = 0; i < numRecords; i++) {
+      w.write(MESSAGE + id + " " + i + "\n");
+    }
+
+    w.close();
+  }
+
+  // This is the total number of map output records we expect to generate,
+  // based on input file sizes (see createMultiMapsInput()) and the behavior
+  // of the different StressMapper threads.
+  private final static int TOTAL_RECORDS = 50000
+      + (500 * 500)
+      + (500 * 500)
+      + (20 * 500)
+      + (5000 * 500)
+      + (500 * 500);
+
+  private final String INPUT_DIR = "multiMapInput";
+  private final String OUTPUT_DIR = "multiMapOutput";
+
+  private Path getInputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(INPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), INPUT_DIR);
+    }
+  }
+
+  private Path getOutputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(OUTPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), OUTPUT_DIR);
+    }
+  }
+
+  /**
+   * Create the inputs for the MultiMaps test.
+   * @return the path to the input directory.
+   */
+  private Path createMultiMapsInput() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path inputPath = getInputPath();
+
+    // Clear the input directory if it exists, first.
+    if (fs.exists(inputPath)) {
+      fs.delete(inputPath, true);
+    }
+
+    // Create input files, with sizes calibrated based on
+    // the amount of work done in each mapper.
+    createInputFile(inputPath, 0, 50000);
+    createInputFile(inputPath, 1, 500);
+    createInputFile(inputPath, 2, 500);
+    createInputFile(inputPath, 3, 20);
+    createInputFile(inputPath, 4, 5000);
+    createInputFile(inputPath, 5, 500);
+
+    return inputPath;
+  }
+
+  /**
+   * Verify that we got the correct amount of output.
+   */
+  private void verifyOutput(Path outputPath) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    Path outputFile = new Path(outputPath, "part-r-00000");
+    InputStream is = fs.open(outputFile);
+    BufferedReader r = new BufferedReader(new InputStreamReader(is));
+
+    // Should get a single line of the form "0\t(count)"
+    String line = r.readLine().trim();
+    assertTrue("Line does not have correct key", line.startsWith("0\t"));
+    int count = Integer.valueOf(line.substring(2));
+    assertEquals("Incorrect count generated!", TOTAL_RECORDS, count);
+
+    r.close();
+
+  }
+
+  /**
+   * Run a test with several mappers in parallel, operating at different
+   * speeds. Verify that the correct amount of output is created.
+   */
+  @Test
+  public void testMultiMaps() throws Exception {
+    Job job = new Job();
+
+    Path inputPath = createMultiMapsInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(StressMapper.class);
+    job.setReducerClass(CountingReducer.class);
+    job.setNumReduceTasks(1);
+    LocalJobRunner.setLocalMaxRunningMaps(job, 6);
+    job.getConfiguration().set("io.sort.record.pct", "0.50");
+    job.getConfiguration().set("io.sort.mb", "25");
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    verifyOutput(outputPath);
+  }
+
+  /**
+   * Run a test with a misconfigured number of mappers.
+   * Expect failure.
+   */
+  @Test
+  public void testInvalidMultiMapParallelism() throws Exception {
+    Job job = new Job();
+
+    Path inputPath = createMultiMapsInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(StressMapper.class);
+    job.setReducerClass(CountingReducer.class);
+    job.setNumReduceTasks(1);
+    LocalJobRunner.setLocalMaxRunningMaps(job, -6);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    boolean success = job.waitForCompletion(true);
+    assertFalse("Job succeeded somehow", success);
+  }
+
+  /** An IF that creates no splits */
+  private static class EmptyInputFormat extends InputFormat<Object, Object> {
+    public List<InputSplit> getSplits(JobContext context) {
+      return new ArrayList<InputSplit>();
+    }
+
+    public RecordReader<Object, Object> createRecordReader(InputSplit split,
+        TaskAttemptContext context) {
+      return new EmptyRecordReader();
+    }
+  }
+
+  private static class EmptyRecordReader extends RecordReader<Object, Object> {
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+    }
+
+    public Object getCurrentKey() {
+      return new Object();
+    }
+
+    public Object getCurrentValue() {
+      return new Object();
+    }
+
+    public float getProgress() {
+      return 0.0f;
+    }
+
+    public void close() {
+    }
+
+    public boolean nextKeyValue() {
+      return false;
+    }
+  }
+
+  /** Test case for zero mappers */
+  public void testEmptyMaps() throws Exception {
+    Job job = new Job();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setInputFormatClass(EmptyInputFormat.class);
+    job.setNumReduceTasks(1);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    boolean success = job.waitForCompletion(true);
+    assertTrue("Empty job should work", success);
+  }
+}
+



Mime
View raw message