hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r785392 [5/5] - in /hadoop/core/branches/HADOOP-4687/mapred: conf/ lib/ src/c++/ src/c++/task-controller/ src/contrib/dynamic-scheduler/ src/contrib/sqoop/ src/contrib/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ s...
Date Tue, 16 Jun 2009 20:54:28 GMT
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java Tue Jun 16 20:54:24 2009
@@ -23,17 +23,22 @@
 import java.io.IOException;
 import java.util.Random;
 import java.util.Iterator;
+import java.util.StringTokenizer;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.TestProcfsBasedProcessTree;
 
 import org.apache.commons.logging.Log;
@@ -49,11 +54,11 @@
             .getLog(TestKillSubProcesses.class);
 
   private static String TEST_ROOT_DIR = new File(System.getProperty(
-      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+      "test.build.data", "/tmp"), "killjob").toURI().toString().replace(' ', '+');
 
   private static JobClient jobClient = null;
 
-  private static MiniMRCluster mr = null;
+  static MiniMRCluster mr = null;
   private static Path scriptDir = null;
   private static String scriptDirName = null;
   private static String pid = null;
@@ -70,7 +75,7 @@
     conf.setJobName("testkilljobsubprocesses");
     conf.setMapperClass(KillingMapperWithChildren.class);
     
-    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    scriptDir = new Path(TEST_ROOT_DIR , "script");
     RunningJob job = runJobAndSetProcessHandle(jt, conf);
 
     // kill the job now
@@ -181,9 +186,8 @@
           }
         }
         LOG.info("pid of map task is " + pid);
-
-        // Checking if the map task is alive
-        assertTrue(ProcessTree.isAlive(pid));
+        //Checking if the map task is alive
+        assertTrue("Map is no more alive", isAlive(pid));
         LOG.info("The map task is alive before Job completion, as expected.");
       }
     }
@@ -216,7 +220,7 @@
                  " is " + childPid);
         assertTrue("Unexpected: The subprocess at level " + i +
                    " in the subtree is not alive before Job completion",
-                   ProcessTree.isAlive(childPid));
+                   isAlive(childPid));
       }
     }
     return job;
@@ -250,10 +254,10 @@
                  " is " + childPid);
         assertTrue("Unexpected: The subprocess at level " + i +
                    " in the subtree is alive after Job completion",
-                   !ProcessTree.isAlive(childPid));
+                   !isAlive(childPid));
       }
     }
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.getLocal(mr.createJobConf());
     if(fs.exists(scriptDir)) {
       fs.delete(scriptDir, true);
     }
@@ -261,10 +265,23 @@
   
   private static RunningJob runJob(JobConf conf) throws IOException {
 
-    final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
-    final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
+    final Path inDir;
+    final Path outDir;
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileSystem tempFs = FileSystem.get(conf);
+    //Check if test is run with hdfs or local file system.
+    //if local filesystem then prepend TEST_ROOT_DIR, otherwise
+    //killjob folder would be created in workspace root.
+    if (!tempFs.getUri().toASCIIString().equals(
+        fs.getUri().toASCIIString())) {
+      inDir = new Path("killjob/input");
+      outDir = new Path("killjob/output");
+    } else {
+      inDir = new Path(TEST_ROOT_DIR, "input");
+      outDir = new Path(TEST_ROOT_DIR, "output");
+    }
 
-    FileSystem fs = FileSystem.get(conf);
+    
     if(fs.exists(scriptDir)) {
       fs.delete(scriptDir, true);
     }
@@ -290,9 +307,7 @@
       // run the TCs
       conf = mr.createJobConf();
       JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
-      runKillingJobAndValidate(jt, conf);
-      runFailingJobAndValidate(jt, conf);
-      runSuccessfulJobAndValidate(jt, conf);
+      runTests(conf, jt);
     } finally {
       if (mr != null) {
         mr.shutdown();
@@ -300,12 +315,25 @@
     }
   }
 
+  void runTests(JobConf conf, JobTracker jt) throws IOException {
+    FileSystem fs = FileSystem.getLocal(mr.createJobConf());
+    Path rootDir = new Path(TEST_ROOT_DIR);
+    if(!fs.exists(rootDir)) {
+      fs.mkdirs(rootDir);
+    }
+    fs.setPermission(rootDir, 
+        new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    runKillingJobAndValidate(jt, conf);
+    runFailingJobAndValidate(jt, conf);
+    runSuccessfulJobAndValidate(jt, conf);
+  }
+
   /**
    * Creates signal file
    */
   private static void signalTask(String signalFile, JobConf conf) {
     try {
-      FileSystem fs = FileSystem.get(conf);
+      FileSystem fs = FileSystem.getLocal(conf);
       fs.createNewFile(new Path(signalFile));
     } catch(IOException e) {
       LOG.warn("Unable to create signal file. " + e);
@@ -317,10 +345,12 @@
    */
   private static void runChildren(JobConf conf) throws IOException {
     if (ProcessTree.isSetsidAvailable) {
-      FileSystem fs = FileSystem.get(conf);
+      FileSystem fs = FileSystem.getLocal(conf);
       TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
-      scriptDir = new Path(TEST_ROOT_DIR + "/script");  
-    
+      scriptDir = new Path(TEST_ROOT_DIR + "/script");
+      if(fs.exists(scriptDir)){
+        fs.delete(scriptDir, true);
+      }
       // create shell script
       Random rm = new Random();
       Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
@@ -329,6 +359,7 @@
       String script =
         "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
         "echo hello\n" +
+        "trap 'echo got SIGTERM' 15 \n" +
         "if [ $1 != 0 ]\nthen\n" +
         " sh " + shellScript + " $(($1-1))\n" +
         "else\n" +
@@ -447,4 +478,46 @@
       throw new RuntimeException("failing map");
     }
   }
+  
+  /**
+   * Check for presence of the process with the pid passed is alive or not
+   * currently.
+   * 
+   * @param pid pid of the process
+   * @return if a process is alive or not.
+   */
+  private static boolean isAlive(String pid) throws IOException {
+    String commandString ="ps -o pid,command -e";
+    String args[] = new String[] {"bash", "-c" , commandString};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(args);
+    try {
+      shExec.execute(); 
+    }catch(ExitCodeException e) {
+      return false;
+    } catch (IOException e) {
+      LOG.warn("IOExecption thrown while checking if process is alive" + 
+          StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    String output = shExec.getOutput();
+
+    //Parse the command output and check for pid, ignore the commands
+    //which has ps or grep in it.
+    StringTokenizer strTok = new StringTokenizer(output, "\n");
+    boolean found = false;
+    while(strTok.hasMoreTokens()) {
+      StringTokenizer pidToken = new StringTokenizer(strTok.nextToken(), 
+          " ");
+      String pidStr = pidToken.nextToken();
+      String commandStr = pidToken.nextToken();
+      if(pid.equals(pidStr) && !(commandStr.contains("ps") 
+          || commandStr.contains("grep"))) {
+        found = true;
+        break;
+      }
+    }
+    return found; 
+  }
+  
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Tue Jun 16 20:54:24 2009
@@ -30,7 +30,6 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
@@ -97,15 +96,17 @@
     // Run Sort
     Sort sort = new Sort();
     assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
-    Counters counters = sort.getResult().getCounters();
-    long mapInput = counters.findCounter(TaskCounter.MAP_INPUT_BYTES
-    ).getValue();
+    org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters();
+    long mapInput = counters.findCounter(
+      org.apache.hadoop.mapreduce.lib.input.FileInputFormat.COUNTER_GROUP,
+      org.apache.hadoop.mapreduce.lib.input.FileInputFormat.BYTES_READ).
+      getValue();
     long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
                                          "HDFS_BYTES_READ").getValue();
     // the hdfs read should be between 100% and 110% of the map input bytes
     assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead,
                (hdfsRead < (mapInput * 1.1)) &&
-               (hdfsRead > mapInput));  
+               (hdfsRead >= mapInput));  
   }
   
   private static void runSortValidator(JobConf job, 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Tue Jun 16 20:54:24 2009
@@ -30,6 +30,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -375,4 +376,83 @@
     
     stopCluster();
   }
-}
\ No newline at end of file
+
+  /**
+   * Check if excluded hosts are decommissioned across restart  
+   */
+  public void testMRExcludeHostsAcrossRestarts() throws IOException {
+    // start a cluster with 2 hosts and empty exclude-hosts file
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.jobtracker.restart.recover", true);
+
+    File file = new File("hosts.exclude");
+    file.delete();
+    startCluster(2, 1, 0, conf);
+    String hostToDecommission = getHostname(1);
+    conf = mr.createJobConf(new JobConf(conf));
+
+    // submit a job
+    Path inDir = new Path("input");
+    Path outDir = new Path("output");
+    Path signalFilename = new Path("share");
+    JobConf newConf = new JobConf(conf);
+    UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1, 
+        "restart-decommission", signalFilename.toString(), 
+        signalFilename.toString());
+    
+    JobClient jobClient = new JobClient(newConf);
+    RunningJob job = jobClient.submitJob(newConf);
+    JobID id = job.getID();
+    
+    // wait for 50%
+    while (job.mapProgress() < 0.5f) {
+      UtilsForTests.waitFor(100);
+    }
+    
+    // change the exclude-hosts file to include one host
+    FileOutputStream out = new FileOutputStream(file);
+    LOG.info("Writing excluded nodes to log file " + file.toString());
+    BufferedWriter writer = null;
+    try {
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      writer.write( hostToDecommission + "\n"); // decommission first host
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      out.close();
+    }
+    file.deleteOnExit();
+
+    // restart the jobtracker
+    mr.stopJobTracker();
+    mr.startJobTracker();
+
+    // Wait for the JT to be ready
+    UtilsForTests.waitForJobTracker(jobClient);
+
+    jt = mr.getJobTrackerRunner().getJobTracker();
+    UtilsForTests.signalTasks(dfs, dfs.getFileSystem(), 
+        signalFilename.toString(), signalFilename.toString(), 1);
+
+    assertTrue("Decommissioning of tracker has no effect restarted job", 
+        jt.getJob(job.getID()).failedMapTasks > 0);
+    
+    // check the cluster status and tracker size
+    assertEquals("Tracker is not lost upon host decommissioning", 
+                 1, jt.getClusterStatus(false).getTaskTrackers());
+    assertEquals("Excluded node count is incorrect", 
+                 1, jt.getClusterStatus(false).getNumExcludedNodes());
+    
+    // check if the host is disallowed
+    for (TaskTrackerStatus status : jt.taskTrackers()) {
+      assertFalse("Tracker from decommissioned host still exist", 
+                  status.getHost().equals(hostToDecommission));
+    }
+
+    // wait for the job
+    job.waitForCompletion();
+
+    stopCluster();
+  }
+}

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Tue Jun 16 20:54:24 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
 import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
 import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
@@ -310,7 +311,7 @@
     fs.delete(rFile,false);
     
     // start the jobtracker
-    LOG.info("Stopping jobtracker with system files deleted");
+    LOG.info("Starting jobtracker with system files deleted");
     mr.startJobTracker();
     
     UtilsForTests.waitForJobTracker(jc);
@@ -394,8 +395,58 @@
     LOG.info("Starting jobtracker with fs errors");
     mr.startJobTracker();
     JobTrackerRunner runner = mr.getJobTrackerRunner();
-    assertFalse("Restart count for new job is incorrect", runner.isActive());
+    assertFalse("JobTracker is still alive", runner.isActive());
 
     mr.shutdown();
   } 
+
+  /**
+   * Test if the jobtracker waits for the info file to be created before 
+   * starting.
+   */
+  public void testJobTrackerInfoCreation() throws Exception {
+    LOG.info("Testing jobtracker.info file");
+    MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
+    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+                      + (dfs.getFileSystem()).getUri().getPort();
+    // shut down the data nodes
+    dfs.shutdownDataNodes();
+
+    // start the jobtracker
+    JobConf conf = new JobConf();
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+
+    JobTracker jobtracker = new JobTracker(conf);
+
+    // now check if the update restart count works fine or not
+    boolean failed = false;
+    try {
+      jobtracker.recoveryManager.updateRestartCount();
+    } catch (IOException ioe) {
+      failed = true;
+    }
+    assertTrue("JobTracker created info files without datanodes!!!", failed);
+
+    Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
+    Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
+    FileSystem fs = dfs.getFileSystem();
+    assertFalse("Info file exists after update failure", 
+                fs.exists(restartFile));
+    assertFalse("Temporary restart-file exists after update failure", 
+                fs.exists(restartFile));
+
+    // start 1 data node
+    dfs.startDataNodes(conf, 1, true, null, null, null, null);
+    dfs.waitActive();
+
+    failed = false;
+    try {
+      jobtracker.recoveryManager.updateRestartCount();
+    } catch (IOException ioe) {
+      failed = true;
+    }
+    assertFalse("JobTracker failed to create info files with datanodes!!!", failed);
+  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Tue Jun 16 20:54:24 2009
@@ -91,7 +91,7 @@
     RawKeyValueIterator rawItr = 
       Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
-                   new Text.Comparator(), new NullProgress(),null,null);
+                   new Text.Comparator(), new NullProgress(), null, null, null);
     @SuppressWarnings("unchecked") // WritableComparators are not generic
     ReduceTask.ValuesIterator valItr = 
       new ReduceTask.ValuesIterator<Text,Text>(rawItr,

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Tue Jun 16 20:54:24 2009
@@ -22,10 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ToolRunner;
@@ -46,7 +43,6 @@
 
   static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
   
-  private MiniDFSCluster miniDFSCluster;
   private MiniMRCluster miniMRCluster;
 
   /**
@@ -77,41 +73,42 @@
           getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
       long totalPhysicalMemoryOnTT =
           getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long virtualMemoryReservedOnTT =
-          getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long physicalMemoryReservedOnTT =
-          getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long mapSlotMemorySize =
+          getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
+      long reduceSlotMemorySize =
+          getConf()
+              .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
 
       long reportedTotalVirtualMemoryOnTT =
           status.getResourceStatus().getTotalVirtualMemory();
       long reportedTotalPhysicalMemoryOnTT =
           status.getResourceStatus().getTotalPhysicalMemory();
-      long reportedVirtualMemoryReservedOnTT =
-          status.getResourceStatus().getReservedTotalMemory();
-      long reportedPhysicalMemoryReservedOnTT =
-          status.getResourceStatus().getReservedPhysicalMemory();
+      long reportedMapSlotMemorySize =
+          status.getResourceStatus().getMapSlotMemorySizeOnTT();
+      long reportedReduceSlotMemorySize =
+          status.getResourceStatus().getReduceSlotMemorySizeOnTT();
 
       message =
           "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
-              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", "
-              + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT
-              + ")";
+              + "mapSlotMemSize, reduceSlotMemorySize) = ("
+              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ","
+              + mapSlotMemorySize + "," + reduceSlotMemorySize + ")";
       message +=
           "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+              + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = ("
               + reportedTotalVirtualMemoryOnTT
               + ", "
               + reportedTotalPhysicalMemoryOnTT
-              + ", "
-              + reportedVirtualMemoryReservedOnTT
-              + ", "
-              + reportedPhysicalMemoryReservedOnTT + ")";
+              + ","
+              + reportedMapSlotMemorySize
+              + ","
+              + reportedReduceSlotMemorySize
+              + ")";
       LOG.info(message);
       if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
           || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
-          || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT
-          || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) {
+          || mapSlotMemorySize != reportedMapSlotMemorySize
+          || reduceSlotMemorySize != reportedReduceSlotMemorySize) {
         hasPassed = false;
       }
       return super.assignTasks(status);
@@ -132,7 +129,7 @@
           TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
           DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
       setUpCluster(conf);
-      runSleepJob();
+      runSleepJob(miniMRCluster.createJobConf());
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -149,8 +146,9 @@
     JobConf conf = new JobConf();
     conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
     conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+    conf.setLong("mapSlotMemorySize", 1 * 512L);
+    conf.setLong("reduceSlotMemorySize", 1 * 1024L);
+
     conf.setClass(
         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
@@ -158,15 +156,17 @@
         4 * 1024 * 1024 * 1024L);
     conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
         2 * 1024 * 1024 * 1024L);
+    conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        512L);
     conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-        512 * 1024 * 1024L);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
+    
     try {
       setUpCluster(conf);
-      runSleepJob();
+      JobConf jobConf = miniMRCluster.createJobConf();
+      jobConf.setMemoryForMapTask(1 * 1024L);
+      jobConf.setMemoryForReduceTask(2 * 1024L);
+      runSleepJob(jobConf);
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -189,17 +189,10 @@
     LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
     conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
     conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
-    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-        512 * 1024 * 1024L);
+
     try {
       setUpCluster(conf);
-      runSleepJob();
+      runSleepJob(miniMRCluster.createJobConf());
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -208,22 +201,15 @@
 
   private void setUpCluster(JobConf conf)
                                 throws Exception {
-    conf.setClass("mapred.jobtracker.taskScheduler", 
-        TestTTMemoryReporting.FakeTaskScheduler.class,
-        TaskScheduler.class);
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
-                      null, null, conf);    
+    conf.setClass("mapred.jobtracker.taskScheduler",
+        TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
+    conf.set("mapred.job.tracker.handler.count", "1");
+    miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
   }
   
-  private void runSleepJob() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set("mapred.job.tracker", "localhost:"
-                              + miniMRCluster.getJobTrackerPort());
+  private void runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "1", "-r", "1",
-                      "-mt", "1000", "-rt", "1000" };
+                      "-mt", "10", "-rt", "10" };
     ToolRunner.run(conf, new SleepJob(), args);
   }
 
@@ -235,7 +221,8 @@
   }
   
   private void tearDownCluster() {
-    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
-    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+    if (miniMRCluster != null) {
+      miniMRCluster.shutdown();
+    }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Jun 16 20:54:24 2009
@@ -18,21 +18,24 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.TestProcfsBasedProcessTree;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.fs.FileSystem;
 
 import junit.framework.TestCase;
 
@@ -43,18 +46,22 @@
 
   private static final Log LOG =
       LogFactory.getLog(TestTaskTrackerMemoryManager.class);
-  private MiniDFSCluster miniDFSCluster;
+  private static String TEST_ROOT_DIR = new Path(System.getProperty(
+		    "test.build.data", "/tmp")).toString().replace(' ', '+');
+
   private MiniMRCluster miniMRCluster;
 
   private String taskOverLimitPatternString =
       "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
           + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
 
-  private void startCluster(JobConf conf) throws Exception {
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf);
+  private void startCluster(JobConf conf)
+      throws Exception {
+    conf.set("mapred.job.tracker.handler.count", "1");
+    conf.set("mapred.tasktracker.map.tasks.maximum", "1");
+    conf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
+    conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
+    miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
   }
 
   @Override
@@ -62,9 +69,6 @@
     if (miniMRCluster != null) {
       miniMRCluster.shutdown();
     }
-    if (miniDFSCluster != null) {
-      miniDFSCluster.shutdown();
-    }
   }
 
   private int runSleepJob(JobConf conf) throws Exception {
@@ -74,15 +78,6 @@
 
   private void runAndCheckSuccessfulJob(JobConf conf)
       throws IOException {
-    // Set up job.
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
-
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
     Matcher mat = null;
@@ -148,43 +143,12 @@
       return;
     }
 
-    JobConf conf = new JobConf();
     // Task-memory management disabled by default.
-    startCluster(conf);
-    long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    runAndCheckSuccessfulJob(conf);
-  }
-
-  /**
-   * Test for verifying that tasks with no limits, with the cumulative usage
-   * still under TT's limits, succeed.
-   * 
-   * @throws Exception
-   */
-  public void testTasksWithNoLimits()
-      throws Exception {
-    // Run the test only if memory management is enabled
-    if (!isProcfsBasedTreeAvailable()) {
-      return;
-    }
-
-    // Fairly large value for sleepJob to succeed
-    long ttLimit = 4 * 1024 * 1024 * 1024L;
-    // Start cluster with proper configuration.
-    JobConf fConf = new JobConf();
-
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        ttLimit);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit);
-    fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
-    startCluster(fConf);
-    JobConf conf = new JobConf();
+    startCluster(new JobConf());
+    long PER_TASK_LIMIT = 1L; // Doesn't matter how low.
+    JobConf conf = miniMRCluster.createJobConf();
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
   }
 
@@ -202,33 +166,25 @@
     }
 
     // Large so that sleepjob goes through and fits total TT usage
-    long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L;
-    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L;
+    long PER_TASK_LIMIT = 2 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024L);
     fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
-    startCluster(fConf);
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    startCluster(new JobConf());
+
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
-
   }
 
   /**
-   * Test for verifying that tasks that go beyond limits, though the cumulative
-   * usage is under TT's limits, get killed.
+   * Test for verifying that tasks that go beyond limits get killed.
    * 
    * @throws Exception
    */
@@ -240,43 +196,32 @@
       return;
     }
 
-    long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
-    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit
-    // total usage
+    long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks.
+
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, String
-            .valueOf(PER_TASK_LIMIT)));
+            .valueOf(PER_TASK_LIMIT*1024*1024L)));
     Matcher mat = null;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
 
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    fConf.setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024);
     startCluster(fConf);
 
     // Set up job.
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+    conf.setMaxMapAttempts(1);
+    conf.setMaxReduceAttempts(1);
 
     // Start the job.
     int ret = 0;
@@ -334,48 +279,39 @@
     }
 
     // Large enough for SleepJob Tasks.
-    long PER_TASK_LIMIT = 100000000000L;
-    // Very Limited TT. All tasks will be killed.
-    long TASK_TRACKER_LIMIT = 100L;
-    Pattern taskOverLimitPattern =
-        Pattern.compile(String.format(taskOverLimitPatternString, String
-            .valueOf(PER_TASK_LIMIT)));
-    Pattern trackerOverLimitPattern =
-        Pattern
-            .compile("Killing one of the least progress tasks - .*, as "
-                + "the cumulative memory usage of all the tasks on the TaskTracker"
-                + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
-    Matcher mat = null;
+    long PER_TASK_LIMIT = 100 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        1L);
     fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L);
+
+    // Because of the above, the total tt limit is 2mb
+    long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L;
+
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
 
     startCluster(fConf);
 
+    Pattern taskOverLimitPattern =
+      Pattern.compile(String.format(taskOverLimitPatternString, String
+          .valueOf(PER_TASK_LIMIT)));
+
+    Pattern trackerOverLimitPattern =
+      Pattern
+          .compile("Killing one of the least progress tasks - .*, as "
+              + "the cumulative memory usage of all the tasks on the TaskTracker"
+              + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+    Matcher mat = null;
+
     // Set up job.
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
 
     JobClient jClient = new JobClient(conf);
     SleepJob sleepJob = new SleepJob();
@@ -385,10 +321,12 @@
     job.submit();
     boolean TTOverFlowMsgPresent = false;
     while (true) {
-      // Set-up tasks are the first to be launched.
-      TaskReport[] setUpReports = jClient.getSetupTaskReports(
-                                    (org.apache.hadoop.mapred.JobID)job.getID());
-      for (TaskReport tr : setUpReports) {
+      List<TaskReport> allTaskReports = new ArrayList<TaskReport>();
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      for (TaskReport tr : allTaskReports) {
         String[] diag = tr.getDiagnostics();
         for (String str : diag) {
           mat = taskOverLimitPattern.matcher(str);
@@ -414,4 +352,90 @@
     // Test succeeded, kill the job.
     job.killJob();
   }
+  
+  /**
+   * Test to verify the check for whether a process tree is over limit or not.
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  public void testProcessTreeLimits() throws IOException {
+    
+    // set up a dummy proc file system
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+    String[] pids = { "100", "200", "300", "400", "500", "600", "700" };
+    try {
+      TestProcfsBasedProcessTree.setupProcfsRootDir(procfsRootDir);
+      
+      // create pid dirs.
+      TestProcfsBasedProcessTree.setupPidDirs(procfsRootDir, pids);
+      
+      // create process infos.
+      TestProcfsBasedProcessTree.ProcessStatInfo[] procs =
+          new TestProcfsBasedProcessTree.ProcessStatInfo[7];
+
+      // assume pids 100, 500 are in 1 tree 
+      // 200,300,400 are in another
+      // 600,700 are in a third
+      procs[0] = new TestProcfsBasedProcessTree.ProcessStatInfo(
+          new String[] {"100", "proc1", "1", "100", "100", "100000"});
+      procs[1] = new TestProcfsBasedProcessTree.ProcessStatInfo(
+          new String[] {"200", "proc2", "1", "200", "200", "200000"});
+      procs[2] = new TestProcfsBasedProcessTree.ProcessStatInfo(
+          new String[] {"300", "proc3", "200", "200", "200", "300000"});
+      procs[3] = new TestProcfsBasedProcessTree.ProcessStatInfo(
+          new String[] {"400", "proc4", "200", "200", "200", "400000"});
+      procs[4] = new TestProcfsBasedProcessTree.ProcessStatInfo(
+          new String[] {"500", "proc5", "100", "100", "100", "1500000"});
+      procs[5] = new TestProcfsBasedProcessTree.ProcessStatInfo(
+          new String[] {"600", "proc6", "1", "600", "600", "100000"});
+      procs[6] = new TestProcfsBasedProcessTree.ProcessStatInfo(
+          new String[] {"700", "proc7", "600", "600", "600", "100000"});
+      // write stat files.
+      TestProcfsBasedProcessTree.writeStatFiles(procfsRootDir, pids, procs);
+
+      // vmem limit
+      long limit = 700000;
+      
+      // Create TaskMemoryMonitorThread
+      TaskMemoryManagerThread test = new TaskMemoryManagerThread(1000000L,
+                                                                5000L);
+      // create process trees
+      // tree rooted at 100 is over limit immediately, as it is
+      // twice over the mem limit.
+      ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree(
+                                          "100", true, 100L, 
+                                          procfsRootDir.getAbsolutePath());
+      pTree.getProcessTree();
+      assertTrue("tree rooted at 100 should be over limit " +
+                    "after first iteration.",
+                  test.isProcessTreeOverLimit(pTree, "dummyId", limit));
+      
+      // the tree rooted at 200 is initially below limit.
+      pTree = new ProcfsBasedProcessTree("200", true, 100L,
+                                          procfsRootDir.getAbsolutePath());
+      pTree.getProcessTree();
+      assertFalse("tree rooted at 200 shouldn't be over limit " +
+                    "after one iteration.",
+                  test.isProcessTreeOverLimit(pTree, "dummyId", limit));
+      // second iteration - now the tree has been over limit twice,
+      // hence it should be declared over limit.
+      pTree.getProcessTree();
+      assertTrue("tree rooted at 200 should be over limit after 2 iterations",
+                  test.isProcessTreeOverLimit(pTree, "dummyId", limit));
+      
+      // the tree rooted at 600 is never over limit.
+      pTree = new ProcfsBasedProcessTree("600", true, 100L,
+                                            procfsRootDir.getAbsolutePath());
+      pTree.getProcessTree();
+      assertFalse("tree rooted at 600 should never be over limit.",
+                    test.isProcessTreeOverLimit(pTree, "dummyId", limit));
+      
+      // another iteration does not make any difference.
+      pTree.getProcessTree();
+      assertFalse("tree rooted at 600 should never be over limit.",
+                    test.isProcessTreeOverLimit(pTree, "dummyId", limit));
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Jun 16 20:54:24 2009
@@ -548,6 +548,12 @@
   // Start a job and return its RunningJob object
   static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
                     throws IOException {
+    return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks());
+  }
+
+  // Start a job and return its RunningJob object
+  static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
+                           int numReds) throws IOException {
 
     FileSystem fs = FileSystem.get(conf);
     if (fs.exists(outDir)) {
@@ -558,9 +564,11 @@
     }
     String input = "The quick brown fox\n" + "has many silly\n"
         + "red fox sox\n";
-    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
-    file.writeBytes(input);
-    file.close();
+    for (int i = 0; i < numMaps; ++i) {
+      DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+      file.writeBytes(input);
+      file.close();
+    }    
 
     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputKeyClass(LongWritable.class);
@@ -568,8 +576,8 @@
 
     FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
-    conf.setNumMapTasks(conf.getNumMapTasks());
-    conf.setNumReduceTasks(conf.getNumReduceTasks());
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
 
     JobClient jobClient = new JobClient(conf);
     RunningJob job = jobClient.submitJob(conf);

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Tue Jun 16 20:54:24 2009
@@ -38,12 +38,16 @@
 
 public class TestKeyFieldBasedComparator extends HadoopTestCase {
   JobConf conf;
-  String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.1 4444 011 011 234";
-  String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.0 4444.1";
+  JobConf localConf;
+  
+  String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.0 4444.1 011 011 234";
+  String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.1 4444";
 
   public TestKeyFieldBasedComparator() throws IOException {
     super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
     conf = createJobConf();
+    localConf = createJobConf();
+    localConf.set("map.output.key.field.separator", " ");
   }
   public void configure(String keySpec, int expect) throws Exception {
     Path testdir = new Path("build/test/test.mapred.spill");
@@ -123,9 +127,24 @@
     configure("-k2.4,2.4n", 2);
     configure("-k7,7", 1);
     configure("-k7,7n", 2);
-    configure("-k8,8n", 2);
-    configure("-k9,9n", 1);
+    configure("-k8,8n", 1);
+    configure("-k9,9", 2);
     configure("-k11,11",2);
     configure("-k10,10",2);
+    
+    localTestWithoutMRJob("-k9,9", 1);
+  }
+  
+  byte[] line1_bytes = line1.getBytes();
+  byte[] line2_bytes = line2.getBytes();
+
+  public void localTestWithoutMRJob(String keySpec, int expect) throws Exception {
+    KeyFieldBasedComparator<Void, Void> keyFieldCmp = new KeyFieldBasedComparator<Void, Void>();
+    localConf.setKeyFieldComparatorOptions(keySpec);
+    keyFieldCmp.configure(localConf);
+    int result = keyFieldCmp.compare(line1_bytes, 0, line1_bytes.length,
+        line2_bytes, 0, line2_bytes.length);
+    if ((expect >= 0 && result < 0) || (expect < 0 && result >= 0))
+      fail();
   }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Jun 16 20:54:24 2009
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.MultiFileWordCount;
 import org.apache.hadoop.examples.SecondarySort;
 import org.apache.hadoop.examples.WordCount;
 import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
@@ -41,6 +42,7 @@
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * A JUnit test to test min map-reduce cluster with local file system.
@@ -88,6 +90,7 @@
       Configuration conf = mr.createJobConf();
       runWordCount(conf);
       runSecondarySort(conf);
+      runMultiFileWordCount(conf);
     } finally {
       if (mr != null) { mr.shutdown(); }
     }
@@ -118,6 +121,9 @@
                  out);
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
+    long mapIn = ctrs.findCounter(FileInputFormat.COUNTER_GROUP, 
+                                  FileInputFormat.BYTES_READ).getValue();
+    assertTrue(mapIn != 0);    
     long combineIn = ctrs.findCounter(COUNTER_GROUP,
                                       "COMBINE_INPUT_RECORDS").getValue();
     long combineOut = ctrs.findCounter(COUNTER_GROUP, 
@@ -169,5 +175,21 @@
                  "------------------------------------------------\n" +
                  "10\t20\n10\t25\n10\t30\n", out);
   }
-  
+ 
+  public void runMultiFileWordCount(Configuration  conf) throws Exception  {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);    
+    writeFile("in/part1", "this is a test\nof " +
+              "multi file word count test\ntest\n");
+    writeFile("in/part2", "more test");
+
+    int ret = ToolRunner.run(conf, new MultiFileWordCount(), 
+                new String[] {TEST_ROOT_DIR + "/in", TEST_ROOT_DIR + "/out"});
+    assertTrue("MultiFileWordCount failed", ret == 0);
+    String out = readFile("out/part-r-00000");
+    System.out.println(out);
+    assertEquals("a\t1\ncount\t1\nfile\t1\nis\t1\n" +
+      "more\t1\nmulti\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n", out);
+  }
+
 }

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Tue Jun 16 20:54:24 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112
+/hadoop/core/trunk/src/webapps/job:776175-784663

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp Tue Jun 16 20:54:24 2009
@@ -267,6 +267,10 @@
           "<a href=\"jobblacklistedtrackers.jsp?jobid=" + jobId + "\">" +
           flakyTaskTrackers + "</a><br>\n");
     }
+    if (job.getSchedulingInfo() != null) {
+      out.print("<b>Job Scheduling information: </b>" +
+          job.getSchedulingInfo().toString() +"\n");
+    }
     out.print("<hr>\n");
     out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");
     out.print("<tr><th>Kind</th><th>% Complete</th><th>Num Tasks</th>" +

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp Tue Jun 16 20:54:24 2009
@@ -126,7 +126,12 @@
 <table border=2 cellpadding="5" cellspacing="2">
 <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> 
   <%
-   if (!ts[0].getIsMap() && !isCleanupOrSetup) {
+   if (ts[0].getIsMap()) {
+  %>
+<td>Map Phase Finished</td>
+  <%
+   }
+   else if(!isCleanupOrSetup) {
    %>
 <td>Shuffle Finished</td><td>Sort Finished</td>
   <%
@@ -181,7 +186,12 @@
         out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getStartTime(), 0) + "</td>");
-        if (!ts[i].getIsMap() && !isCleanupOrSetup) {
+        if (ts[i].getIsMap()) {
+          out.print("<td>"
+          + StringUtils.getFormattedTimeWithDiff(dateFormat, status
+          .getMapFinishTime(), status.getStartTime()) + "</td>");
+        }
+        else if (!isCleanupOrSetup) {
           out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getShuffleFinishTime(), status.getStartTime()) + "</td>");
@@ -268,7 +278,7 @@
 </center>
 
 <%
-      if (ts[0].getIsMap()) {
+      if (ts[0].getIsMap() && !isCleanupOrSetup) {
 %>
 <h3>Input Split Locations</h3>
 <table border=2 cellpadding="5" cellspacing="2">



Mime
View raw message