hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1091316 [2/3] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-core/ mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache...
Date Tue, 12 Apr 2011 07:56:09 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,760 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A JUnit test to test ProcfsBasedProcessTree.
+ */
+public class TestProcfsBasedProcessTree {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestProcfsBasedProcessTree.class);
+  protected static File TEST_ROOT_DIR = new File("target",
+      TestProcfsBasedProcessTree.class.getName() + "-localDir");
+
+  private ShellCommandExecutor shexec = null;
+  private String pidFile, lowestDescendant;
+  private String shellScript;
+
+  private static final int N = 6; // Controls the RogueTask
+
+  private class RogueTaskThread extends Thread {
+    public void run() {
+      try {
+        Vector<String> args = new Vector<String>();
+        if(isSetsidAvailable()) {
+          args.add("setsid");
+        }
+        args.add("bash");
+        args.add("-c");
+        args.add(" echo $$ > " + pidFile + "; sh " +
+                          shellScript + " " + N + ";") ;
+        shexec = new ShellCommandExecutor(args.toArray(new String[0]));
+        shexec.execute();
+      } catch (ExitCodeException ee) {
+        LOG.info("Shell Command exit with a non-zero exit code. This is" +
+                 " expected as we are killing the subprocesses of the" +
+                 " task intentionally. " + ee);
+      } catch (IOException ioe) {
+        LOG.info("Error executing shell command " + ioe);
+      } finally {
+        LOG.info("Exit code: " + shexec.getExitCode());
+      }
+    }
+  }
+
+  private String getRogueTaskPID() {
+    File f = new File(pidFile);
+    while (!f.exists()) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ie) {
+        break;
+      }
+    }
+
+    // read from pidFile
+    return getPidFromPidFile(pidFile);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    FileContext.getLocalFSFileContext().delete(
+        new Path(TEST_ROOT_DIR.getAbsolutePath()), true);
+  }
+
+  @Test
+  public void testProcessTree() throws Exception {
+
+    try {
+      if (!ProcfsBasedProcessTree.isAvailable()) {
+        System.out
+            .println("ProcfsBasedProcessTree is not available on this system. Not testing");
+        return;
+      }
+    } catch (Exception e) {
+      LOG.info(StringUtils.stringifyException(e));
+      return;
+    }
+    // create shell script
+    Random rm = new Random();
+    File tempFile =
+        new File(TEST_ROOT_DIR, getClass().getName() + "_shellScript_"
+            + rm.nextInt() + ".sh");
+    tempFile.deleteOnExit();
+    shellScript = TEST_ROOT_DIR + File.separator + tempFile.getName();
+
+    // create pid file
+    tempFile =
+        new File(TEST_ROOT_DIR, getClass().getName() + "_pidFile_"
+            + rm.nextInt() + ".pid");
+    tempFile.deleteOnExit();
+    pidFile = TEST_ROOT_DIR + File.separator + tempFile.getName();
+
+    lowestDescendant = TEST_ROOT_DIR + File.separator + "lowestDescendantPidFile";
+
+    // write to shell-script
+    try {
+      FileWriter fWriter = new FileWriter(shellScript);
+      fWriter.write(
+          "# rogue task\n" +
+          "sleep 1\n" +
+          "echo hello\n" +
+          "if [ $1 -ne 0 ]\n" +
+          "then\n" +
+          " sh " + shellScript + " $(($1-1))\n" +
+          "else\n" +
+          " echo $$ > " + lowestDescendant + "\n" +
+          " while true\n do\n" +
+          "  sleep 5\n" +
+          " done\n" +
+          "fi");
+      fWriter.close();
+    } catch (IOException ioe) {
+      LOG.info("Error: " + ioe);
+      return;
+    }
+
+    Thread t = new RogueTaskThread();
+    t.start();
+    String pid = getRogueTaskPID();
+    LOG.info("Root process pid: " + pid);
+    ProcfsBasedProcessTree p = createProcessTree(pid);
+    p = p.getProcessTree(); // initialize
+    LOG.info("ProcessTree: " + p.toString());
+
+    File leaf = new File(lowestDescendant);
+    //wait till lowest descendant process of Rougue Task starts execution
+    while (!leaf.exists()) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ie) {
+        break;
+      }
+    }
+
+    p = p.getProcessTree(); // reconstruct
+    LOG.info("ProcessTree: " + p.toString());
+
+    // Get the process-tree dump
+    String processTreeDump = p.getProcessTreeDump();
+
+    // destroy the process and all its subprocesses
+    destroyProcessTree(pid);
+
+    if (isSetsidAvailable()) { // whole processtree should be gone
+      Assert.assertFalse("Proceesses in process group live",
+          isAnyProcessInTreeAlive(p));
+    } else {// process should be gone
+      Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
+    }
+
+    LOG.info("Process-tree dump follows: \n" + processTreeDump);
+    Assert.assertTrue("Process-tree dump doesn't start with a proper header",
+        processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+        "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+        "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
+    for (int i = N; i >= 0; i--) {
+      String cmdLineDump = "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\)" +
+          " [0-9]+ [0-9]+ [0-9]+ [0-9]+ sh " + shellScript + " " + i;
+      Pattern pat = Pattern.compile(cmdLineDump);
+      Matcher mat = pat.matcher(processTreeDump);
+      Assert.assertTrue("Process-tree dump doesn't contain the cmdLineDump of " + i
+          + "th process!", mat.find());
+    }
+
+    // Not able to join thread sometimes when forking with large N.
+    try {
+      t.join(2000);
+      LOG.info("RogueTaskThread successfully joined.");
+    } catch (InterruptedException ie) {
+      LOG.info("Interrupted while joining RogueTaskThread.");
+    }
+
+    // ProcessTree is gone now. Any further calls should be sane.
+    p = p.getProcessTree();
+    Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
+    Assert.assertTrue("Cumulative vmem for the gone-process is "
+        + p.getCumulativeVmem() + " . It should be zero.", p
+        .getCumulativeVmem() == 0);
+    Assert.assertTrue(p.toString().equals("[ ]"));
+  }
+
+  protected ProcfsBasedProcessTree createProcessTree(String pid) {
+    return new ProcfsBasedProcessTree(pid,
+                               isSetsidAvailable());
+  }
+
+  protected ProcfsBasedProcessTree createProcessTree(String pid,
+      boolean setsidUsed, String procfsRootDir) {
+    return new ProcfsBasedProcessTree(pid, setsidUsed, procfsRootDir);
+  }
+
+  protected void destroyProcessTree(String pid) throws IOException {
+    sendSignal(pid, 9);
+  }
+
+  /**
+   * Get PID from a pid-file.
+   * 
+   * @param pidFileName
+   *          Name of the pid-file.
+   * @return the PID string read from the pid-file. Returns null if the
+   *         pidFileName points to a non-existing file or if read fails from the
+   *         file.
+   */
+  public static String getPidFromPidFile(String pidFileName) {
+    BufferedReader pidFile = null;
+    FileReader fReader = null;
+    String pid = null;
+
+    try {
+      fReader = new FileReader(pidFileName);
+      pidFile = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      LOG.debug("PidFile doesn't exist : " + pidFileName);
+      return pid;
+    }
+
+    try {
+      pid = pidFile.readLine();
+    } catch (IOException i) {
+      LOG.error("Failed to read from " + pidFileName);
+    } finally {
+      try {
+        if (fReader != null) {
+          fReader.close();
+        }
+        try {
+          if (pidFile != null) {
+            pidFile.close();
+          }
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + pidFile);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+    return pid;
+  }
+  
+  public static class ProcessStatInfo {
+    // sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624 
+    // 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640 
+    // 134590050 3220521392 3220520036 10975138 0 0 4096 134234626 
+    // 4294967295 0 0 17 1 0 0
+    String pid;
+    String name;
+    String ppid;
+    String pgrpId;
+    String session;
+    String vmem = "0";
+    String rssmemPage = "0";
+    String utime = "0";
+    String stime = "0";
+    
+    public ProcessStatInfo(String[] statEntries) {
+      pid = statEntries[0];
+      name = statEntries[1];
+      ppid = statEntries[2];
+      pgrpId = statEntries[3];
+      session = statEntries[4];
+      vmem = statEntries[5];
+      if (statEntries.length > 6) {
+        rssmemPage = statEntries[6];
+      }
+      if (statEntries.length > 7) {
+        utime = statEntries[7];
+        stime = statEntries[8];
+      }
+    }
+    
+    // construct a line that mimics the procfs stat file.
+    // all unused numerical entries are set to 0.
+    public String getStatLine() {
+      return String.format("%s (%s) S %s %s %s 0 0 0" +
+                      " 0 0 0 0 %s %s 0 0 0 0 0 0 0 %s %s 0 0" +
+                      " 0 0 0 0 0 0 0 0" +
+                      " 0 0 0 0 0", 
+                      pid, name, ppid, pgrpId, session,
+                      utime, stime, vmem, rssmemPage);
+    }
+  }
+  
+  /**
+   * A basic test that creates a few process directories and writes
+   * stat files. Verifies that the cpu time and memory is correctly
+   * computed.
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  @Test
+  public void testCpuAndMemoryForProcessTree() throws IOException {
+
+    // test processes
+    String[] pids = { "100", "200", "300", "400" };
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      setupPidDirs(procfsRootDir, pids);
+      
+      // create stat objects.
+      // assuming processes 100, 200, 300 are in tree and 400 is not.
+      ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+      procInfos[0] = new ProcessStatInfo(new String[] 
+          {"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
+      procInfos[1] = new ProcessStatInfo(new String[] 
+          {"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
+      procInfos[2] = new ProcessStatInfo(new String[] 
+          {"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
+      procInfos[3] = new ProcessStatInfo(new String[] 
+          {"400", "proc4", "1", "400", "400", "400000", "400", "4000", "800"});
+      
+      writeStatFiles(procfsRootDir, pids, procInfos);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree =
+          createProcessTree("100", true, procfsRootDir.getAbsolutePath());
+      // build the process tree.
+      processTree.getProcessTree();
+      
+      // verify cumulative memory
+      Assert.assertEquals("Cumulative virtual memory does not match", 600000L,
+                   processTree.getCumulativeVmem());
+
+      // verify rss memory
+      long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                        600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      Assert.assertEquals("Cumulative rss memory does not match",
+                   cumuRssMem, processTree.getCumulativeRssmem());
+
+      // verify cumulative cpu time
+      long cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+             7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+      Assert.assertEquals("Cumulative cpu time does not match",
+                   cumuCpuTime, processTree.getCumulativeCpuTime());
+
+      // test the cpu time again to see if it cumulates
+      procInfos[0] = new ProcessStatInfo(new String[]
+          {"100", "proc1", "1", "100", "100", "100000", "100", "2000", "300"});
+      procInfos[1] = new ProcessStatInfo(new String[]
+          {"200", "proc2", "100", "100", "100", "200000", "200", "3000", "500"});
+      writeStatFiles(procfsRootDir, pids, procInfos);
+
+      // build the process tree.
+      processTree.getProcessTree();
+
+      // verify cumulative cpu time again
+      cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+             9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+      Assert.assertEquals("Cumulative cpu time does not match",
+                   cumuCpuTime, processTree.getCumulativeCpuTime());
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+  
+  /**
+   * Tests that cumulative memory is computed only for
+   * processes older than a given age.
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  @Test
+  public void testMemForOlderProcesses() throws IOException {
+    // initial list of processes
+    String[] pids = { "100", "200", "300", "400" };
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      setupPidDirs(procfsRootDir, pids);
+      
+      // create stat objects.
+      // assuming 100, 200 and 400 are in tree, 300 is not.
+      ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+      procInfos[0] = new ProcessStatInfo(new String[] 
+                        {"100", "proc1", "1", "100", "100", "100000", "100"});
+      procInfos[1] = new ProcessStatInfo(new String[] 
+                        {"200", "proc2", "100", "100", "100", "200000", "200"});
+      procInfos[2] = new ProcessStatInfo(new String[] 
+                        {"300", "proc3", "1", "300", "300", "300000", "300"});
+      procInfos[3] = new ProcessStatInfo(new String[] 
+                        {"400", "proc4", "100", "100", "100", "400000", "400"});
+      
+      writeStatFiles(procfsRootDir, pids, procInfos);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree =
+          createProcessTree("100", true, procfsRootDir.getAbsolutePath());
+      // build the process tree.
+      processTree.getProcessTree();
+      
+      // verify cumulative memory
+      Assert.assertEquals("Cumulative memory does not match",
+                   700000L, processTree.getCumulativeVmem());
+
+      // write one more process as child of 100.
+      String[] newPids = { "500" };
+      setupPidDirs(procfsRootDir, newPids);
+      
+      ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1];
+      newProcInfos[0] = new ProcessStatInfo(new String[]
+                      {"500", "proc5", "100", "100", "100", "500000", "500"});
+      writeStatFiles(procfsRootDir, newPids, newProcInfos);
+      
+      // check memory includes the new process.
+      processTree.getProcessTree();
+      Assert.assertEquals("Cumulative vmem does not include new process",
+                   1200000L, processTree.getCumulativeVmem());
+      long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                        1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      Assert.assertEquals("Cumulative rssmem does not include new process",
+                   cumuRssMem, processTree.getCumulativeRssmem());
+      
+      // however processes older than 1 iteration will retain the older value
+      Assert.assertEquals("Cumulative vmem shouldn't have included new process",
+                   700000L, processTree.getCumulativeVmem(1));
+      cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                   700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      Assert.assertEquals("Cumulative rssmem shouldn't have included new process",
+                   cumuRssMem, processTree.getCumulativeRssmem(1));
+
+      // one more process
+      newPids = new String[]{ "600" };
+      setupPidDirs(procfsRootDir, newPids);
+      
+      newProcInfos = new ProcessStatInfo[1];
+      newProcInfos[0] = new ProcessStatInfo(new String[]
+                      {"600", "proc6", "100", "100", "100", "600000", "600"});
+      writeStatFiles(procfsRootDir, newPids, newProcInfos);
+
+      // refresh process tree
+      processTree.getProcessTree();
+      
+      // processes older than 2 iterations should be same as before.
+      Assert.assertEquals("Cumulative vmem shouldn't have included new processes",
+                   700000L, processTree.getCumulativeVmem(2));
+      cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                   700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      Assert.assertEquals("Cumulative rssmem shouldn't have included new processes",
+                   cumuRssMem, processTree.getCumulativeRssmem(2));
+
+      // processes older than 1 iteration should not include new process,
+      // but include process 500
+      Assert.assertEquals("Cumulative vmem shouldn't have included new processes",
+                   1200000L, processTree.getCumulativeVmem(1));
+      cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                   1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      Assert.assertEquals("Cumulative rssmem shouldn't have included new processes",
+                   cumuRssMem, processTree.getCumulativeRssmem(1));
+
+      // no processes older than 3 iterations, this should be 0
+      Assert.assertEquals("Getting non-zero vmem for processes older than 3 iterations",
+                    0L, processTree.getCumulativeVmem(3));
+      Assert.assertEquals("Getting non-zero rssmem for processes older than 3 iterations",
+                    0L, processTree.getCumulativeRssmem(3));
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+
+  /**
+   * Verifies ProcfsBasedProcessTree.checkPidPgrpidForMatch() in case of
+   * 'constructProcessInfo() returning null' by not writing stat file for the
+   * mock process
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  @Test
+  public void testDestroyProcessTree() throws IOException {
+    // test process
+    String pid = "100";
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree =
+          createProcessTree(pid, true, procfsRootDir.getAbsolutePath());
+
+      // Let us not create stat file for pid 100.
+      Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
+            Integer.valueOf(pid), procfsRootDir.getAbsolutePath()));
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+  
+  /**
+   * Test the correctness of process-tree dump.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testProcessTreeDump()
+      throws IOException {
+
+    String[] pids = { "100", "200", "300", "400", "500", "600" };
+
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      setupPidDirs(procfsRootDir, pids);
+
+      int numProcesses = pids.length;
+      // Processes 200, 300, 400 and 500 are descendants of 100. 600 is not.
+      ProcessStatInfo[] procInfos = new ProcessStatInfo[numProcesses];
+      procInfos[0] = new ProcessStatInfo(new String[] {
+          "100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
+      procInfos[1] = new ProcessStatInfo(new String[] {
+          "200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
+      procInfos[2] = new ProcessStatInfo(new String[] {
+          "300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
+      procInfos[3] = new ProcessStatInfo(new String[] {
+          "400", "proc4", "200", "100", "100", "400000", "400", "4000", "800"});
+      procInfos[4] = new ProcessStatInfo(new String[] {
+          "500", "proc5", "400", "100", "100", "400000", "400", "4000", "800"});
+      procInfos[5] = new ProcessStatInfo(new String[] {
+          "600", "proc6", "1", "1", "1", "400000", "400", "4000", "800"});
+
+      String[] cmdLines = new String[numProcesses];
+      cmdLines[0] = "proc1 arg1 arg2";
+      cmdLines[1] = "proc2 arg3 arg4";
+      cmdLines[2] = "proc3 arg5 arg6";
+      cmdLines[3] = "proc4 arg7 arg8";
+      cmdLines[4] = "proc5 arg9 arg10";
+      cmdLines[5] = "proc6 arg11 arg12";
+
+      writeStatFiles(procfsRootDir, pids, procInfos);
+      writeCmdLineFiles(procfsRootDir, pids, cmdLines);
+
+      ProcfsBasedProcessTree processTree = createProcessTree(
+          "100", true, procfsRootDir.getAbsolutePath());
+      // build the process tree.
+      processTree.getProcessTree();
+
+      // Get the process-tree dump
+      String processTreeDump = processTree.getProcessTreeDump();
+
+      LOG.info("Process-tree dump follows: \n" + processTreeDump);
+      Assert.assertTrue("Process-tree dump doesn't start with a proper header",
+          processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+          "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+          "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
+      for (int i = 0; i < 5; i++) {
+        ProcessStatInfo p = procInfos[i];
+        Assert.assertTrue(
+            "Process-tree dump doesn't contain the cmdLineDump of process "
+                + p.pid, processTreeDump.contains("\t|- " + p.pid + " "
+                + p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name
+                + ") " + p.utime + " " + p.stime + " " + p.vmem + " "
+                + p.rssmemPage + " " + cmdLines[i]));
+      }
+
+      // 600 should not be in the dump
+      ProcessStatInfo p = procInfos[5];
+      Assert.assertFalse(
+          "Process-tree dump shouldn't contain the cmdLineDump of process "
+              + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid
+              + " " + p.pgrpId + " " + p.session + " (" + p.name + ") "
+              + p.utime + " " + p.stime + " " + p.vmem + " " + cmdLines[5]));
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+
+  protected static boolean isSetsidAvailable() {
+      ShellCommandExecutor shexec = null;
+      boolean setsidSupported = true;
+      try {
+        String[] args = {"setsid", "bash", "-c", "echo $$"};
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (IOException ioe) {
+        LOG.warn("setsid is not available on this machine. So not using it.");
+        setsidSupported = false;
+      } finally { // handle the exit code
+        LOG.info("setsid exited with exit code " + shexec.getExitCode());
+      }
+      return setsidSupported;
+  }
+
+  /**
+   * Is the root-process alive?
+   * Used only in tests.
+   * @return true if the root-process is alive, false otherwise.
+   */
+  private static boolean isAlive(String pid) {
+    try {
+      final String sigpid = isSetsidAvailable() ? "-" + pid : pid;
+      try {
+        sendSignal(sigpid, 0);
+      } catch (ExitCodeException e) {
+        return false;
+      }
+      return true;
+    } catch (IOException ignored) {
+    }
+    return false;
+  }
+
+  private static void sendSignal(String pid, int signal) throws IOException {
+    ShellCommandExecutor shexec = null;
+      String[] arg = { "kill", "-" + signal, pid };
+      shexec = new ShellCommandExecutor(arg);
+    shexec.execute();
+  }
+
+  /**
+   * Is any of the subprocesses in the process-tree alive?
+   * Used only in tests.
+   * @return true if any of the processes in the process-tree is
+   *           alive, false otherwise.
+   */
+  private static boolean isAnyProcessInTreeAlive(
+      ProcfsBasedProcessTree processTree) {
+    for (Integer pId : processTree.getCurrentProcessIDs()) {
+      if (isAlive(pId.toString())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Create a directory to mimic the procfs file system's root.
+   * @param procfsRootDir root directory to create.
+   * @throws IOException if could not delete the procfs root directory
+   */
+  public static void setupProcfsRootDir(File procfsRootDir) 
+                                        throws IOException { 
+    // cleanup any existing process root dir.
+    if (procfsRootDir.exists()) {
+      Assert.assertTrue(FileUtil.fullyDelete(procfsRootDir));  
+    }
+
+    // create afresh
+    Assert.assertTrue(procfsRootDir.mkdirs());
+  }
+
+  /**
+   * Create PID directories under the specified procfs root directory
+   * @param procfsRootDir root directory of procfs file system
+   * @param pids the PID directories to create.
+   * @throws IOException If PID dirs could not be created
+   */
+  public static void setupPidDirs(File procfsRootDir, String[] pids) 
+                      throws IOException {
+    for (String pid : pids) {
+      File pidDir = new File(procfsRootDir, pid);
+      pidDir.mkdir();
+      if (!pidDir.exists()) {
+        throw new IOException ("couldn't make process directory under " +
+            "fake procfs");
+      } else {
+        LOG.info("created pid dir");
+      }
+    }
+  }
+  
+  /**
+   * Write stat files under the specified pid directories with data
+   * setup in the corresponding ProcessStatInfo objects
+   * @param procfsRootDir root directory of procfs file system
+   * @param pids the PID directories under which to create the stat file
+   * @param procs corresponding ProcessStatInfo objects whose data should be
+   *              written to the stat files.
+   * @throws IOException if stat files could not be written
+   */
+  public static void writeStatFiles(File procfsRootDir, String[] pids, 
+                              ProcessStatInfo[] procs) throws IOException {
+    for (int i=0; i<pids.length; i++) {
+      File statFile =
+          new File(new File(procfsRootDir, pids[i]),
+              ProcfsBasedProcessTree.PROCFS_STAT_FILE);
+      BufferedWriter bw = null;
+      try {
+        FileWriter fw = new FileWriter(statFile);
+        bw = new BufferedWriter(fw);
+        bw.write(procs[i].getStatLine());
+        LOG.info("wrote stat file for " + pids[i] + 
+                  " with contents: " + procs[i].getStatLine());
+      } finally {
+        // not handling exception - will throw an error and fail the test.
+        if (bw != null) {
+          bw.close();
+        }
+      }
+    }
+  }
+
+  private static void writeCmdLineFiles(File procfsRootDir, String[] pids,
+      String[] cmdLines)
+      throws IOException {
+    for (int i = 0; i < pids.length; i++) {
+      File statFile =
+          new File(new File(procfsRootDir, pids[i]),
+              ProcfsBasedProcessTree.PROCFS_CMDLINE_FILE);
+      BufferedWriter bw = null;
+      try {
+        bw = new BufferedWriter(new FileWriter(statFile));
+        bw.write(cmdLines[i]);
+        LOG.info("wrote command-line file for " + pids[i] + " with contents: "
+            + cmdLines[i]);
+      } finally {
+        // not handling exception - will throw an error and fail the test.
+        if (bw != null) {
+          bw.close();
+        }
+      }
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java Tue Apr 12 07:56:07 2011
@@ -151,7 +151,7 @@ public class TestRPC {
       status = recordFactory.newRecordInstance(ContainerStatus.class);
       status.setState(ContainerState.RUNNING);
       status.setContainerId(container.getContainerId());
-      status.setExitStatus(0);
+      status.setExitStatus(String.valueOf(0));
       return response;
     }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java Tue Apr 12 07:56:07 2011
@@ -21,8 +21,8 @@ package org.apache.hadoop;
 /**
  * A clock class - can be mocked out for testing.
  */
-class Clock {
-  long getTime() {
+public class Clock {
+  public long getTime() {
     return System.currentTimeMillis();
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml Tue Apr 12 07:56:07 2011
@@ -71,8 +71,7 @@
     <value>0.0.0.0:45454</value>
   </property>
 
-<property><name>yarn.server.nodemanager.connect.rm</name><value>true</value></property> 
-
+  <!--  HealthChecker's properties -->
   <property>
     <name>yarn.server.nodemanager.healthchecker.script.path</name>
     <value></value>
@@ -130,6 +129,26 @@
     <description>Arguments to be passed to the health-check script run
     by the NodeManager</description>
   </property>
+  <!-- End of HealthChecker's properties -->
+
+  <!-- ContainerMonitor related properties -->
+
+  <property>
+    <name>yarn.server.nodemanager.containers-monitor.monitoring-interval</name>
+    <value>3000</value>
+  </property>
+
+  <property>
+    <name>yarn.server.nodemanager.containers-monitor.resourcecalculatorplugin</name>
+    <value></value>
+  </property>
+
+   <property>
+     <name>yarn.server.nodemanager.reserved-physical-memory.mb</name>
+     <value>-1</value>
+   </property>
+
+  <!-- End of ContainerMonitor related properties -->
 
 <!-- All MRAppMaster related configuration properties -->
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml Tue Apr 12 07:56:07 2011
@@ -1,5 +1,10 @@
 <?xml version="1.0"?>
 <project>
+
+  <properties>
+    <yarn.version>1.0-SNAPSHOT</yarn.version>
+  </properties>
+
   <parent>
     <artifactId>yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Tue Apr 12 07:56:07 2011
@@ -50,7 +50,9 @@ public class NMConfig {
 
   public static final String DEFAULT_NM_LOG_DIR = "/tmp/logs";
 
-  public static final String NM_RESOURCE = NM_PREFIX + "resource.memory.gb";
+  public static final int DEFAULT_NM_VMEM_GB = 8;
+
+  public static final String NM_VMEM_GB = NM_PREFIX + "resource.memory.gb";
 
   // TODO: Should this instead be dictated by RM?
   public static final String HEARTBEAT_INTERVAL = NM_PREFIX

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Apr 12 07:56:07 2011
@@ -94,7 +94,7 @@ public class NodeStatusUpdaterImpl exten
     this.heartBeatInterval =
         conf.getLong(NMConfig.HEARTBEAT_INTERVAL,
             NMConfig.DEFAULT_HEARTBEAT_INTERVAL);
-    int memory = conf.getInt(NMConfig.NM_RESOURCE, 8);
+    int memory = conf.getInt(NMConfig.NM_VMEM_GB, 8);
     this.totalResource = recordFactory.newRecordInstance(Resource.class);
     this.totalResource.setMemory(memory * 1024);
     super.init(conf);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Apr 12 07:56:07 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
@@ -130,7 +132,8 @@ public class ContainerManagerImpl extend
     auxiluaryServices.register(this);
     addService(auxiluaryServices);
 
-    ContainersMonitor containersMonitor = new ContainersMonitor();
+    ContainersMonitor containersMonitor =
+        new ContainersMonitorImpl(exec, dispatcher);
     addService(containersMonitor);
 
     dispatcher.register(ContainerEventType.class,
@@ -247,6 +250,9 @@ public class ContainerManagerImpl extend
       //    + containerID + " on this NodeManager!!");
     }
     dispatcher.getEventHandler().handle(
+        new ContainerDiagnosticsUpdateEvent(containerID,
+            "Container killed by the application."));
+    dispatcher.getEventHandler().handle(
         new ContainerEvent(containerID, ContainerEventType.KILL_CONTAINER));
 
     // TODO: Move this code to appropriate place once kill_container is
@@ -324,6 +330,9 @@ public class ContainerManagerImpl extend
       for (org.apache.hadoop.yarn.api.records.Container container : containersFinishedEvent
           .getContainersToCleanup()) {
         this.dispatcher.getEventHandler().handle(
+            new ContainerDiagnosticsUpdateEvent(container.getId(),
+                "Container Killed by ResourceManager"));
+        this.dispatcher.getEventHandler().handle(
             new ContainerEvent(container.getId(),
                 ContainerEventType.KILL_CONTAINER));
       }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Tue Apr 12 07:56:07 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -318,6 +319,9 @@ public class ApplicationImpl implements 
       // application.
       for (ContainerId containerID : app.containers.keySet()) {
         app.dispatcher.getEventHandler().handle(
+            new ContainerDiagnosticsUpdateEvent(containerID,
+                "Container killed on application-finish."));
+        app.dispatcher.getEventHandler().handle(
             new ContainerEvent(containerID,
                 ContainerEventType.KILL_CONTAINER));
       }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerDiagnosticsUpdateEvent extends ContainerEvent {
+
+  private final String diagnosticsUpdate;
+
+  public ContainerDiagnosticsUpdateEvent(ContainerId cID, String update) {
+    super(cID, ContainerEventType.UPDATE_DIAGNOSTICS_MSG);
+    this.diagnosticsUpdate = update;
+  }
+
+  public String getDiagnosticsUpdate() {
+    return this.diagnosticsUpdate;
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java Tue Apr 12 07:56:07 2011
@@ -23,6 +23,7 @@ public enum ContainerEventType {
   // Producer: ContainerManager
   INIT_CONTAINER,
   KILL_CONTAINER,
+  UPDATE_DIAGNOSTICS_MSG,
   CONTAINER_DONE,
 
   // DownloadManager

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Apr 12 07:56:07 2011
@@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
@@ -49,6 +49,7 @@ public class ContainerImpl implements Co
   private final Dispatcher dispatcher;
   private final ContainerLaunchContext launchContext;
   private int exitCode;
+  private final StringBuilder diagnostics;
 
   private static final Log LOG = LogFactory.getLog(Container.class);
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -57,6 +58,7 @@ public class ContainerImpl implements Co
       ContainerLaunchContext launchContext) {
     this.dispatcher = dispatcher;
     this.launchContext = launchContext;
+    this.diagnostics = new StringBuilder();
 
     stateMachine = stateMachineFactory.make(this);
   }
@@ -64,6 +66,9 @@ public class ContainerImpl implements Co
   private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
     new ContainerDoneTransition();
 
+  private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
+      new ContainerDiagnosticsUpdateTransition();
+
   // State Machine for each container.
   private static StateMachineFactory
            <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
@@ -72,6 +77,9 @@ public class ContainerImpl implements Co
     // From NEW State
     .addTransition(ContainerState.NEW, ContainerState.LOCALIZING,
         ContainerEventType.INIT_CONTAINER)
+     .addTransition(ContainerState.NEW, ContainerState.NEW,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.NEW, ContainerState.DONE,
         ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION)
 
@@ -80,6 +88,9 @@ public class ContainerImpl implements Co
         ContainerState.LOCALIZED,
         ContainerEventType.CONTAINER_RESOURCES_LOCALIZED,
         new LocalizedTransition())
+     .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.LOCALIZING,
         ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
         ContainerEventType.KILL_CONTAINER,
@@ -91,6 +102,9 @@ public class ContainerImpl implements Co
     .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new ExitedWithFailureTransition())
+     .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
 
     // From RUNNING State
     .addTransition(ContainerState.RUNNING,
@@ -101,6 +115,9 @@ public class ContainerImpl implements Co
         ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new ExitedWithFailureTransition())
+     .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
 
@@ -109,6 +126,10 @@ public class ContainerImpl implements Co
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
             CONTAINER_DONE_TRANSITION)
     .addTransition(ContainerState.EXITED_WITH_SUCCESS,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
                    ContainerState.EXITED_WITH_SUCCESS,
                    ContainerEventType.KILL_CONTAINER)
 
@@ -117,6 +138,10 @@ public class ContainerImpl implements Co
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
             CONTAINER_DONE_TRANSITION)
     .addTransition(ContainerState.EXITED_WITH_FAILURE,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.EXITED_WITH_FAILURE,
                    ContainerState.EXITED_WITH_FAILURE,
                    ContainerEventType.KILL_CONTAINER)
 
@@ -125,6 +150,9 @@ public class ContainerImpl implements Co
         ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new ContainerKilledTransition())
+     .addTransition(ContainerState.KILLING, ContainerState.KILLING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.KILLING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER)
     .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
@@ -139,10 +167,17 @@ public class ContainerImpl implements Co
             ContainerState.DONE,
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
             CONTAINER_DONE_TRANSITION)
+    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
         ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION)
+     .addTransition(ContainerState.DONE, ContainerState.DONE,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
 
     // create the topology tables
     .installTopology();
@@ -203,7 +238,8 @@ public class ContainerImpl implements Co
     ContainerStatus containerStatus = recordFactory.newRecordInstance(ContainerStatus.class);
     containerStatus.setState(getCurrentState());
     containerStatus.setContainerId(this.launchContext.getContainerId());
-	containerStatus.setExitStatus(exitCode);
+    containerStatus.setDiagnostics(diagnostics.toString());
+	  containerStatus.setExitStatus(String.valueOf(exitCode));
     return containerStatus;
   }
 
@@ -244,9 +280,12 @@ public class ContainerImpl implements Co
     public void transition(ContainerImpl container, ContainerEvent event) {
       // Inform the ContainersMonitor to start monitoring the container's
       // resource usage.
+      // TODO: Fix pmem limits below
+      long vmemBytes =
+          container.getLaunchContext().getResource().getMemory() * 1024 * 1024L;
       container.dispatcher.getEventHandler().handle(
-          new ContainersMonitorEvent(
-              ContainersMonitorEventType.START_MONITORING_CONTAINER));
+          new ContainerStartMonitoringEvent(container.getContainerID(),
+              vmemBytes, -1));
     }
   }
 
@@ -326,6 +365,20 @@ public class ContainerImpl implements Co
       // Inform the application
       container.dispatcher.getEventHandler().handle(
           new ApplicationContainerFinishedEvent(container.getContainerID()));
+      // Remove the container from the resource-monitor
+      container.dispatcher.getEventHandler().handle(
+          new ContainerStopMonitoringEvent(container.getContainerID()));
+    }
+  }
+
+  static class ContainerDiagnosticsUpdateTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      ContainerDiagnosticsUpdateEvent updateEvent =
+          (ContainerDiagnosticsUpdateEvent) event;
+      container.diagnostics.append(updateEvent.getDiagnosticsUpdate())
+          .append("\n");
     }
   }
 

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,25 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerStartMonitoringEvent extends ContainersMonitorEvent {
+
+  private final long vmemLimit;
+  private final long pmemLimit;
+
+  public ContainerStartMonitoringEvent(ContainerId containerId,
+      long vmemLimit, long pmemLimit) {
+    super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER);
+    this.vmemLimit = vmemLimit;
+    this.pmemLimit = pmemLimit;
+  }
+
+  public long getVmemLimit() {
+    return this.vmemLimit;
+  }
+
+  public long getPmemLimit() {
+    return this.pmemLimit;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,11 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerStopMonitoringEvent extends ContainersMonitorEvent {
+
+  public ContainerStopMonitoringEvent(ContainerId containerId) {
+    super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java Tue Apr 12 07:56:07 2011
@@ -19,18 +19,9 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
 
-public class ContainersMonitor extends AbstractService
-    implements EventHandler<ContainersMonitorEvent> {
-
-  public ContainersMonitor() {
-    super("containers-monitor");
-  }
-
-  @Override
-  public void handle(ContainersMonitorEvent monitorEvent) {
-    // TODO
-  }
+public interface ContainersMonitor extends Service,
+    EventHandler<ContainersMonitorEvent> {
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java Tue Apr 12 07:56:07 2011
@@ -18,14 +18,22 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.Event;
 
 public class ContainersMonitorEvent extends
     AbstractEvent<ContainersMonitorEventType> {
 
-  public ContainersMonitorEvent(ContainersMonitorEventType eventType) {
+  private final ContainerId containerId;
+
+  public ContainersMonitorEvent(ContainerId containerId,
+      ContainersMonitorEventType eventType) {
     super(eventType);
+    this.containerId = containerId;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
   }
 
 }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,495 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+
+public class ContainersMonitorImpl extends AbstractService implements
+    ContainersMonitor {
+
+  final static Log LOG = LogFactory
+      .getLog(ContainersMonitorImpl.class);
+
+  private final static String MONITORING_INTERVAL_CONFIG_KEY =
+      NMConfig.NM_PREFIX + "containers-monitor.monitoring-interval";
+  public static final String RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY =
+      NMConfig.NM_PREFIX + "containers-monitor.resourcecalculatorplugin";
+  public static final String NM_RESERVED_PHYSICALMEMORY_MB =
+      NMConfig.NM_PREFIX + "reserved-physical-memory.mb";
+
+  private final static int MONITORING_INTERVAL_DEFAULT = 3000;
+  private long monitoringInterval;
+  private MonitoringThread monitoringThread;
+
+  List<ContainerId> containersToBeRemoved;
+  Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
+  Map<ContainerId, ProcessTreeInfo> trackingContainers =
+      new HashMap<ContainerId, ProcessTreeInfo>();
+
+  final ContainerExecutor containerExecutor;
+  private final Dispatcher eventDispatcher;
+  private ResourceCalculatorPlugin resourceCalculatorPlugin;
+
+  private long maxVmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
+  private long maxPmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
+
+  /**
+   * A value which if set for memory related configuration options, indicates
+   * that the options are turned off.
+   */
+  public static final long DISABLED_MEMORY_LIMIT = -1L;
+
+  private static final String MEMORY_USAGE_STRING =
+      "Memory usage of ProcessTree %s for container-id %s : Virtual %d bytes, "
+          +
+          "limit : %d bytes; Physical %d bytes, limit %d bytes";
+
+  public ContainersMonitorImpl(ContainerExecutor exec,
+      AsyncDispatcher dispatcher) {
+    super("containers-monitor");
+
+    this.containerExecutor = exec;
+    this.eventDispatcher = dispatcher;
+
+    this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
+    this.containersToBeRemoved = new ArrayList<ContainerId>();
+    this.monitoringThread = new MonitoringThread();
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    this.monitoringInterval =
+        conf.getLong(MONITORING_INTERVAL_CONFIG_KEY,
+            MONITORING_INTERVAL_DEFAULT);
+
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        conf.getClass(RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY, null,
+            ResourceCalculatorPlugin.class);
+    this.resourceCalculatorPlugin =
+        ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
+    LOG.info(" Using ResourceCalculatorPlugin : "
+        + this.resourceCalculatorPlugin);
+
+    long totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT;
+    if (this.resourceCalculatorPlugin != null) {
+      totalPhysicalMemoryOnNM =
+          this.resourceCalculatorPlugin.getPhysicalMemorySize();
+      if (totalPhysicalMemoryOnNM <= 0) {
+        LOG.warn("NodeManager's totalPmem could not be calculated. "
+            + "Setting it to " + DISABLED_MEMORY_LIMIT);
+        totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT;
+      }
+    }
+
+    // ///////// Virtual memory configuration //////
+    this.maxVmemAllottedForContainers =
+        conf.getLong(NMConfig.NM_VMEM_GB, NMConfig.DEFAULT_NM_VMEM_GB);
+    this.maxVmemAllottedForContainers =
+        this.maxVmemAllottedForContainers * 1024 * 1024 * 1024L; //Normalize
+
+    if (this.maxVmemAllottedForContainers > totalPhysicalMemoryOnNM) {
+      LOG.info("totalMemoryAllottedForContainers > totalPhysicalMemoryOnNM."
+          + " Thrashing might happen.");
+    }
+
+    // ///////// Physical memory configuration //////
+    long reservedPmemOnNM =
+        conf.getLong(NM_RESERVED_PHYSICALMEMORY_MB, DISABLED_MEMORY_LIMIT);
+    reservedPmemOnNM =
+        reservedPmemOnNM == DISABLED_MEMORY_LIMIT
+            ? DISABLED_MEMORY_LIMIT
+            : reservedPmemOnNM * 1024 * 1024; // normalize to bytes
+
+    if (reservedPmemOnNM == DISABLED_MEMORY_LIMIT
+        || totalPhysicalMemoryOnNM == DISABLED_MEMORY_LIMIT) {
+      this.maxPmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
+    } else {
+      this.maxPmemAllottedForContainers =
+          totalPhysicalMemoryOnNM - reservedPmemOnNM;
+    }
+
+    super.init(conf);
+  }
+
+  /**
+   * Is the total physical memory check enabled?
+   * 
+   * @return true if total physical memory check is enabled.
+   */
+  boolean doCheckPhysicalMemory() {
+    return !(this.maxPmemAllottedForContainers == DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Is the total virtual memory check enabled?
+   * 
+   * @return true if total virtual memory check is enabled.
+   */
+  boolean doCheckVirtualMemory() {
+    return !(this.maxVmemAllottedForContainers == DISABLED_MEMORY_LIMIT);
+  }
+
+  @Override
+  public synchronized void start() {
+    this.monitoringThread.start();
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    this.monitoringThread.interrupt();
+    try {
+      this.monitoringThread.join();
+    } catch (InterruptedException e) {
+      ;
+    }
+    super.stop();
+  }
+
+  private static class ProcessTreeInfo {
+    private ContainerId containerId;
+    private String pid;
+    private ProcfsBasedProcessTree pTree;
+    private long vmemLimit;
+    private long pmemLimit;
+
+    public ProcessTreeInfo(ContainerId containerId, String pid,
+        ProcfsBasedProcessTree pTree, long vmemLimit, long pmemLimit) {
+      this.containerId = containerId;
+      this.pid = pid;
+      this.pTree = pTree;
+      this.vmemLimit = vmemLimit;
+      this.pmemLimit = pmemLimit;
+    }
+
+    public ContainerId getContainerId() {
+      return this.containerId;
+    }
+
+    public String getPID() {
+      return this.pid;
+    }
+
+    public void setPid(String pid) {
+      this.pid = pid;
+    }
+
+    public ProcfsBasedProcessTree getProcessTree() {
+      return this.pTree;
+    }
+
+    public void setProcessTree(ProcfsBasedProcessTree pTree) {
+      this.pTree = pTree;
+    }
+
+    public long getVmemLimit() {
+      return this.vmemLimit;
+    }
+
+    /**
+     * @return Physical memory limit for the process tree in bytes
+     */
+    public long getPmemLimit() {
+      return this.pmemLimit;
+    }
+  }
+
+
+  /**
+   * Check whether a container's process tree's current memory usage is over
+   * limit.
+   * 
+   * When a java process exec's a program, it could momentarily account for
+   * double the size of it's memory, because the JVM does a fork()+exec()
+   * which at fork time creates a copy of the parent's memory. If the
+   * monitoring thread detects the memory used by the container tree at the
+   * same instance, it could assume it is over limit and kill the tree, for no
+   * fault of the process itself.
+   * 
+   * We counter this problem by employing a heuristic check: - if a process
+   * tree exceeds the memory limit by more than twice, it is killed
+   * immediately - if a process tree has processes older than the monitoring
+   * interval exceeding the memory limit by even 1 time, it is killed. Else it
+   * is given the benefit of doubt to lie around for one more iteration.
+   * 
+   * @param containerId
+   *          Container Id for the container tree
+   * @param currentMemUsage
+   *          Memory usage of a container tree
+   * @param curMemUsageOfAgedProcesses
+   *          Memory usage of processes older than an iteration in a container
+   *          tree
+   * @param vmemLimit
+   *          The limit specified for the container
+   * @return true if the memory usage is more than twice the specified limit,
+   *         or if processes in the tree, older than this thread's monitoring
+   *         interval, exceed the memory limit. False, otherwise.
+   */
+  boolean isProcessTreeOverLimit(String containerId,
+                                  long currentMemUsage,
+                                  long curMemUsageOfAgedProcesses,
+                                  long vmemLimit) {
+    boolean isOverLimit = false;
+
+    if (currentMemUsage > (2 * vmemLimit)) {
+      LOG.warn("Process tree for container: " + containerId
+          + " running over twice " + "the configured limit. Limit=" + vmemLimit
+          + ", current usage = " + currentMemUsage);
+      isOverLimit = true;
+    } else if (curMemUsageOfAgedProcesses > vmemLimit) {
+      LOG.warn("Process tree for container: " + containerId
+          + " has processes older than 1 "
+          + "iteration running over the configured limit. Limit=" + vmemLimit
+          + ", current usage = " + curMemUsageOfAgedProcesses);
+      isOverLimit = true;
+    }
+
+    return isOverLimit;
+  }
+
+  // method provided just for easy testing purposes
+  boolean isProcessTreeOverLimit(ProcfsBasedProcessTree pTree,
+      String containerId, long limit) {
+    long currentMemUsage = pTree.getCumulativeVmem();
+    // as processes begin with an age 1, we want to see if there are processes
+    // more than 1 iteration old.
+    long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
+    return isProcessTreeOverLimit(containerId, currentMemUsage,
+                                  curMemUsageOfAgedProcesses, limit);
+  }
+
+  private class MonitoringThread extends Thread {
+    public MonitoringThread() {
+
+    }
+
+    @Override
+    public void run() {
+
+      if (!(doCheckPhysicalMemory() || doCheckVirtualMemory())) {
+        LOG.info("Neither virutal-memory nor physical-memory monitoring is " +
+            "needed. Not running the monitor-thread");
+        return;
+      }
+
+      while (true) {
+
+        // Print the processTrees for debugging.
+        if (LOG.isDebugEnabled()) {
+          StringBuffer tmp = new StringBuffer("[ ");
+          for (ProcessTreeInfo p : trackingContainers.values()) {
+            tmp.append(p.getPID());
+            tmp.append(" ");
+          }
+          LOG.debug("Current ProcessTree list : "
+              + tmp.substring(0, tmp.length()) + "]");
+        }
+
+        // Add new containers
+        synchronized (containersToBeAdded) {
+          for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
+              .entrySet()) {
+            ContainerId containerId = entry.getKey();
+            ProcessTreeInfo processTreeInfo = entry.getValue();
+            LOG.info("Starting resource-monitoring for " + containerId);
+            trackingContainers.put(containerId, processTreeInfo);
+          }
+          containersToBeAdded.clear();
+        }
+
+        // Remove finished containers
+        synchronized (containersToBeRemoved) {
+          for (ContainerId containerId : containersToBeRemoved) {
+            trackingContainers.remove(containerId);
+            LOG.info("Stopping resource-monitoring for " + containerId);
+          }
+          containersToBeRemoved.clear();
+        }
+
+        // Now do the monitoring for the trackingContainers
+        // Check memory usage and kill any overflowing containers
+        long vmemStillInUsage = 0;
+        long pmemStillInUsage = 0;
+        for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
+            trackingContainers.entrySet().iterator(); it.hasNext();) {
+
+          Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
+          ContainerId containerId = entry.getKey();
+          ProcessTreeInfo ptInfo = entry.getValue();
+          try {
+            String pId = ptInfo.getPID();
+
+            // Initialize any uninitialized processTrees
+            if (pId == null) {
+              // get pid from ContainerId
+              pId = containerExecutor.getProcessId(ptInfo.getContainerId());
+              if (pId != null) {
+                // pId will be null, either if the container is not spawned yet
+                // or if the container's pid is removed from ContainerExecutor
+                LOG.debug("Tracking ProcessTree " + pId
+                    + " for the first time");
+
+                ProcfsBasedProcessTree pt =
+                    new ProcfsBasedProcessTree(pId,
+                        ContainerExecutor.isSetsidAvailable);
+                ptInfo.setPid(pId);
+                ptInfo.setProcessTree(pt);
+              }
+            }
+            // End of initializing any uninitialized processTrees
+
+            if (pId == null) {
+              continue; // processTree cannot be tracked
+            }
+
+            LOG.debug("Constructing ProcessTree for : PID = " + pId
+                + " ContainerId = " + containerId);
+            ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+            pTree = pTree.getProcessTree(); // get the updated process-tree
+            ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
+                                          // updated state
+            long currentVmemUsage = pTree.getCumulativeVmem();
+            long currentPmemUsage = pTree.getCumulativeRssmem();
+            // as processes begin with an age 1, we want to see if there
+            // are processes more than 1 iteration old.
+            long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
+            long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1);
+            long vmemLimit = ptInfo.getVmemLimit();
+            long pmemLimit = ptInfo.getPmemLimit();
+            LOG.info(String.format(MEMORY_USAGE_STRING, pId,
+                containerId.toString(), currentVmemUsage, vmemLimit,
+                currentPmemUsage, pmemLimit));
+
+            boolean isMemoryOverLimit = false;
+            String msg = "";
+            if (doCheckVirtualMemory()
+                && isProcessTreeOverLimit(containerId.toString(),
+                    currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
+              // Container (the root process) is still alive and overflowing
+              // memory.
+              // Dump the process-tree and then clean it up.
+              msg =
+                  "Container [pid="
+                      + pId
+                      + ",containerID="
+                      + containerId
+                      + "] is running beyond memory-limits. Current usage : "
+                      + currentVmemUsage
+                      + "bytes. Limit : "
+                      + vmemLimit
+                      + "bytes. Killing container. "
+                      + "\nDump of the process-tree for " + containerId
+                      + " : \n" + pTree.getProcessTreeDump();
+              isMemoryOverLimit = true;
+            } else if (doCheckPhysicalMemory()
+                && isProcessTreeOverLimit(containerId.toString(),
+                    currentPmemUsage, curRssMemUsageOfAgedProcesses,
+                    pmemLimit)) {
+              // Container (the root process) is still alive and overflowing
+              // memory.
+              // Dump the process-tree and then clean it up.
+              msg =
+                  "Container [pid="
+                      + pId
+                      + ",tipID="
+                      + containerId
+                      + "] is running beyond physical memory-limits."
+                      + " Current usage : "
+                      + currentPmemUsage
+                      + "bytes. Limit : "
+                      + pmemLimit
+                      + "bytes. Killing container. \nDump of the process-tree for "
+                      + containerId + " : \n" + pTree.getProcessTreeDump();
+              isMemoryOverLimit = true;
+            }
+
+            if (isMemoryOverLimit) {
+              // Virtual or physical memory over limit. Fail the container and
+              // remove
+              // the corresponding process tree
+              LOG.warn(msg);
+              // warn if not a leader
+              if (!pTree.checkPidPgrpidForMatch()) {
+                LOG.error("Killed container process with PID " + pId
+                    + " but it is not a process group leader.");
+              }
+              eventDispatcher.getEventHandler().handle(
+                  new ContainerDiagnosticsUpdateEvent(containerId, msg));
+              // kill the container
+              eventDispatcher.getEventHandler().handle(
+                  new ContainerEvent(containerId,
+                      ContainerEventType.KILL_CONTAINER));
+              it.remove();
+              LOG.info("Removed ProcessTree with root " + pId);
+            } else {
+              // Accounting the total memory in usage for all containers that
+              // are still
+              // alive and within limits.
+              vmemStillInUsage += currentVmemUsage;
+              pmemStillInUsage += currentPmemUsage;
+            }
+          } catch (Exception e) {
+            // Log the exception and proceed to the next container.
+            LOG.warn("Uncaught exception in ContainerMemoryManager "
+                + "while managing memory of " + containerId, e);
+          }
+        }
+
+        try {
+          Thread.sleep(monitoringInterval);
+        } catch (InterruptedException e) {
+          LOG.warn(ContainersMonitorImpl.class.getName()
+              + " is interrupted. Exiting.");
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void handle(ContainersMonitorEvent monitoringEvent) {
+
+    if (!(doCheckPhysicalMemory() || doCheckVirtualMemory())) {
+      return;
+    }
+
+    ContainerId containerId = monitoringEvent.getContainerId();
+    switch (monitoringEvent.getType()) {
+    case START_MONITORING_CONTAINER:
+      ContainerStartMonitoringEvent startEvent =
+          (ContainerStartMonitoringEvent) monitoringEvent;
+      synchronized (this.containersToBeAdded) {
+        ProcessTreeInfo processTreeInfo =
+            new ProcessTreeInfo(containerId, null, null,
+                startEvent.getVmemLimit(), startEvent.getPmemLimit());
+        this.containersToBeAdded.put(containerId, processTreeInfo);
+      }
+      break;
+    case STOP_MONITORING_CONTAINER:
+      synchronized (this.containersToBeRemoved) {
+        this.containersToBeRemoved.add(containerId);
+      }
+      break;
+    default:
+      // TODO: Wrong event.
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Tue Apr 12 07:56:07 2011
@@ -124,14 +124,21 @@ public class DummyContainerManager exten
   }
 
   public static void waitForContainerState(ContainerManager containerManager,
-        ContainerId containerID, ContainerState finalState)
+      ContainerId containerID, ContainerState finalState)
+      throws InterruptedException, YarnRemoteException {
+    waitForContainerState(containerManager, containerID, finalState, 20);
+  }
+
+  public static void waitForContainerState(ContainerManager containerManager,
+        ContainerId containerID, ContainerState finalState, int timeOutMax)
         throws InterruptedException, YarnRemoteException {
       GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
       request.setContainerId(containerID);
       ContainerStatus containerStatus =
           containerManager.getContainerStatus(request).getStatus();
       int timeoutSecs = 0;
-      while (!containerStatus.getState().equals(finalState) && timeoutSecs++ < 20) {
+    while (!containerStatus.getState().equals(finalState)
+        && timeoutSecs++ < timeOutMax) {
         Thread.sleep(1000);
         LOG.info("Waiting for container to get into state " + finalState
             + ". Current state is " + containerStatus.getState());

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Tue Apr 12 07:56:07 2011
@@ -18,10 +18,17 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -44,12 +51,28 @@ public class TestEventFlow {
   private static final Log LOG = LogFactory.getLog(TestEventFlow.class);
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
+  private static File localDir = new File("target",
+      TestEventFlow.class.getName() + "-localDir").getAbsoluteFile();
+  private static File logDir = new File("target",
+      TestEventFlow.class.getName() + "-logDir").getAbsoluteFile();
+
   @Test
   public void testSuccessfulContainerLaunch() throws InterruptedException,
-      YarnRemoteException {
+      IOException {
+
+    FileContext localFS = FileContext.getLocalFSFileContext();
+
+    localFS.delete(new Path(localDir.getAbsolutePath()), true);
+    localFS.delete(new Path(logDir.getAbsolutePath()), true);
+    localDir.mkdir();
+    logDir.mkdir();
+
     Context context = new NMContext();
 
     YarnConfiguration conf = new YarnConfiguration();
+    conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
+    conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
+
     ContainerExecutor exec = new DefaultContainerExecutor();
     DeletionService del = new DeletionService(exec);
     Dispatcher dispatcher = new AsyncDispatcher();
@@ -70,7 +93,7 @@ public class TestEventFlow {
 
     DummyContainerManager containerManager =
         new DummyContainerManager(context, exec, del, nodeStatusUpdater);
-    containerManager.init(new Configuration());
+    containerManager.init(conf);
     containerManager.start();
 
     ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Apr 12 07:56:07 2011
@@ -205,7 +205,7 @@ public class TestNodeStatusUpdater {
     };
 
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(NMConfig.NM_RESOURCE, 5); // 5GB
+    conf.setInt(NMConfig.NM_VMEM_GB, 5); // 5GB
     conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345");
     conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346");
     conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());



Mime
View raw message