hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r790797 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java
Date Fri, 03 Jul 2009 06:10:44 GMT
Author: sharad
Date: Fri Jul  3 06:10:43 2009
New Revision: 790797

URL: http://svn.apache.org/viewvc?rev=790797&view=rev
Log:
MAPREDUCE-625. Modify TestTaskLimits to improve execution time. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=790797&r1=790796&r2=790797&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul  3 06:10:43 2009
@@ -58,6 +58,9 @@
     MAPREDUCE-686. Move TestSpeculativeExecution.Fake* into a separate class
     so that it can be used by other tests. (Jothi Padmanabhan via sharad)
 
+    MAPREDUCE-625. Modify TestTaskLimits to improve execution time.
+    (Jothi Padmanabhan via sharad)
+
   BUG FIXES
     HADOOP-4687. MapReduce is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=790797&r1=790796&r2=790797&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jul  3
06:10:43 2009
@@ -536,9 +536,8 @@
 
     LOG.info("Initializing " + jobId);
 
-    // log job info
-    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
-                                    this.startTime, hasRestarted());
+    logToJobHistory();
+    
     // log the job priority
     setPriority(this.priority);
     
@@ -547,39 +546,16 @@
     //
     String jobFile = profile.getJobFile();
 
-    DataInputStream splitFile =
-      fs.open(new Path(conf.get("mapred.job.split.file")));
-    JobClient.RawSplit[] splits;
-    try {
-      splits = JobClient.readSplitFile(splitFile);
-    } finally {
-      splitFile.close();
-    }
+    JobClient.RawSplit[] splits = createSplits();
     numMapTasks = splits.length;
 
-
-    // if the number of splits is larger than a configured value
-    // then fail the job.
-    int maxTasks = jobtracker.getMaxTasksPerJob();
-    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
-      throw new IOException(
-                "The number of tasks for this job " + 
-                (numMapTasks + numReduceTasks) +
-                " exceeds the configured limit " + maxTasks);
-    }
+    checkTaskLimits();
 
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
+
+    createMapTasks(jobFile, splits);
     
-    maps = new TaskInProgress[numMapTasks];
-    for(int i=0; i < numMapTasks; ++i) {
-      inputLength += splits[i].getDataLength();
-      maps[i] = new TaskInProgress(jobId, jobFile, 
-                                   splits[i], 
-                                   jobtracker, conf, this, i, numSlotsPerMap);
-    }
-    LOG.info("Input size for job " + jobId + " = " + inputLength
-        + ". Number of splits = " + splits.length);
     if (numMapTasks > 0) { 
       nonRunningMapCache = createCache(splits, maxLevel);
     }
@@ -587,17 +563,8 @@
     // set the launch time
     this.launchTime = JobTracker.getClock().getTime();
 
-    //
-    // Create reduce tasks
-    //
-    this.reduces = new TaskInProgress[numReduceTasks];
-    for (int i = 0; i < numReduceTasks; i++) {
-      reduces[i] = new TaskInProgress(jobId, jobFile, 
-                                      numMapTasks, i, 
-                                      jobtracker, conf, this, numSlotsPerReduce);
-      nonRunningReduces.add(reduces[i]);
-    }
-
+    createReduceTasks(jobFile);
+    
     // Calculate the minimum number of maps to be complete before 
     // we should start scheduling reduces
     completedMapsForReduceSlowstart = 
@@ -627,7 +594,64 @@
     }
   }
 
-  private void initSetupCleanupTasks(String jobFile) {
+  void logToJobHistory() throws IOException {
+    // log job info
+    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
+        this.startTime, hasRestarted());
+  }
+
+  JobClient.RawSplit[] createSplits() throws IOException {
+    DataInputStream splitFile =
+      fs.open(new Path(conf.get("mapred.job.split.file")));
+    JobClient.RawSplit[] splits;
+    try {
+      splits = JobClient.readSplitFile(splitFile);
+    } finally {
+      splitFile.close();
+    }
+    return splits;
+  }
+
+  /**
+   * If the number of taks is greater than the configured value
+   * throw an exception that will fail job initialization
+   */
+  void checkTaskLimits() throws IOException {
+    int maxTasks = jobtracker.getMaxTasksPerJob();
+    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
+      throw new IOException(
+                "The number of tasks for this job " + 
+                (numMapTasks + numReduceTasks) +
+                " exceeds the configured limit " + maxTasks);
+    }
+  }
+
+  synchronized void createMapTasks(String jobFile, JobClient.RawSplit[] splits) {
+    maps = new TaskInProgress[numMapTasks];
+    for(int i=0; i < numMapTasks; ++i) {
+      inputLength += splits[i].getDataLength();
+      maps[i] = new TaskInProgress(jobId, jobFile, 
+                                   splits[i], 
+                                   jobtracker, conf, this, 
+                                   i, numSlotsPerMap);
+    }
+    LOG.info("Input size for job " + jobId + " = " + inputLength
+        + ". Number of splits = " + splits.length);
+
+  }
+
+  synchronized void createReduceTasks(String jobFile) {
+    this.reduces = new TaskInProgress[numReduceTasks];
+    for (int i = 0; i < numReduceTasks; i++) {
+      reduces[i] = new TaskInProgress(jobId, jobFile, 
+                                      numMapTasks, i, 
+                                      jobtracker, conf, 
+                                      this, numSlotsPerReduce);
+      nonRunningReduces.add(reduces[i]);
+    }
+  }
+
+  synchronized void initSetupCleanupTasks(String jobFile) {
     if (!jobSetupCleanupNeeded) {
       // nothing to initialize
       return;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java?rev=790797&r1=790796&r2=790797&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java Fri
Jul  3 06:10:43 2009
@@ -21,101 +21,46 @@
 import junit.framework.TestCase;
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.examples.PiEstimator;
-import org.apache.hadoop.fs.FileSystem;
-
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
-
 /**
- * A JUnit test to test configired task limits.
+ * A JUnit test to test configured task limits.
  */
 public class TestTaskLimits extends TestCase {
 
-  {     
-    ((Log4JLogger)JobInProgress.LOG).getLogger().setLevel(Level.ALL);
-  }     
-
-  private static final Log LOG =
-    LogFactory.getLog(TestMiniMRWithDFS.class.getName());
-  
-  static final int NUM_MAPS = 5;
-  static final int NUM_SAMPLES = 100;
-  
-  public static class TestResult {
-    public String output;
-    public RunningJob job;
-    TestResult(RunningJob job, String output) {
-      this.job = job;
-      this.output = output;
+  static void runTest(int maxTasks, int numMaps, int numReds, 
+                      boolean shouldFail) throws Exception {
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
+    conf.set("mapred.job.tracker.handler.count", "1");
+    MiniMRCluster mr = new MiniMRCluster(0, "file:///", 1, null, null, conf);
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    JobConf jc = mr.createJobConf();
+    jc.setNumMapTasks(numMaps);
+    jc.setNumReduceTasks(numReds);
+    JobInProgress jip = new JobInProgress(new JobID(), jc, jt);
+    boolean failed = false;
+    try {
+      jip.checkTaskLimits();
+    } catch (IOException e) {
+      failed = true;
     }
+    assertEquals(shouldFail, failed);
+    mr.shutdown();
   }
   
-  static void runPI(MiniMRCluster mr, JobConf jobconf) 
-      throws IOException, InterruptedException, ClassNotFoundException  {
-    LOG.info("runPI");
-    double estimate = PiEstimator.estimate(NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
-    double error = Math.abs(Math.PI - estimate);
-    System.out.println("PI estimation " + error);
+  public void testBeyondLimits() throws Exception {
+    // Max tasks is 4, Requested is 8, shouldFail = true
+    runTest(4, 8, 0, true);
   }
-
-  /**
-   * Run the pi test with a specifix value of 
-   * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded.
-   */
-  private boolean runOneTest(int maxTasks) 
-      throws IOException, InterruptedException, ClassNotFoundException {
-    MiniDFSCluster dfs = null;
-    MiniMRCluster mr = null;
-    FileSystem fileSys = null;
-    boolean success = false;
-    try {
-      final int taskTrackers = 2;
-
-      Configuration conf = new Configuration();
-      conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
-      dfs = new MiniDFSCluster(conf, 4, true, null);
-      fileSys = dfs.getFileSystem();
-      JobConf jconf = new JobConf(conf);
-      mr = new MiniMRCluster(0, 0, taskTrackers, fileSys.getUri().toString(), 1,
-                             null, null, null, jconf);
-      
-      JobConf jc = mr.createJobConf();
-      try {
-        runPI(mr, jc);
-        success = true;
-      } catch (IOException e) {
-        success = false;
-      }
-    } finally {
-      if (dfs != null) { dfs.shutdown(); }
-      if (mr != null) { mr.shutdown(); }
-    }
-    return success;
+  
+  public void testTaskWithinLimits() throws Exception {
+    // Max tasks is 4, requested is 4, shouldFail = false
+    runTest(4, 4, 0, false);
   }
 
-  public void testTaskLimits() 
-      throws IOException, InterruptedException, ClassNotFoundException {
 
-    System.out.println("Job 1 running with max set to 2");
-    boolean status = runOneTest(2);
-    assertTrue(status == false);
-    System.out.println("Job 1 failed as expected.");
-
-    // verify that checking this limit works well. The job
-    // needs 5 mappers and we set the limit to 7.
-    System.out.println("Job 2 running with max set to 7.");
-    status = runOneTest(7);
-    assertTrue(status == true);
-    System.out.println("Job 2 succeeded as expected.");
-
-    System.out.println("Job 3 running with max disabled.");
-    status = runOneTest(-1);
-    assertTrue(status == true);
-    System.out.println("Job 3 succeeded as expected.");
+  public void testTaskWithoutLimits() throws Exception {
+    // No task limit, requested is 16, shouldFail = false
+    runTest(-1, 8, 8, false);
   }
+
 }



Mime
View raw message