Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 91408 invoked from network); 12 Apr 2011 07:56:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 12 Apr 2011 07:56:46 -0000 Received: (qmail 680 invoked by uid 500); 12 Apr 2011 07:56:45 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 583 invoked by uid 500); 12 Apr 2011 07:56:45 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 567 invoked by uid 99); 12 Apr 2011 07:56:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Apr 2011 07:56:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Apr 2011 07:56:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1139123889ED; Tue, 12 Apr 2011 07:56:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1091316 [2/3] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-core/ mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache... Date: Tue, 12 Apr 2011 07:56:09 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110412075610.1139123889ED@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=1091316&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Tue Apr 12 07:56:07 2011 @@ -0,0 +1,760 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Random; +import java.util.Vector; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell.ExitCodeException; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * A JUnit test to test ProcfsBasedProcessTree. + */ +public class TestProcfsBasedProcessTree { + + private static final Log LOG = LogFactory + .getLog(TestProcfsBasedProcessTree.class); + protected static File TEST_ROOT_DIR = new File("target", + TestProcfsBasedProcessTree.class.getName() + "-localDir"); + + private ShellCommandExecutor shexec = null; + private String pidFile, lowestDescendant; + private String shellScript; + + private static final int N = 6; // Controls the RogueTask + + private class RogueTaskThread extends Thread { + public void run() { + try { + Vector args = new Vector(); + if(isSetsidAvailable()) { + args.add("setsid"); + } + args.add("bash"); + args.add("-c"); + args.add(" echo $$ > " + pidFile + "; sh " + + shellScript + " " + N + ";") ; + shexec = new ShellCommandExecutor(args.toArray(new String[0])); + shexec.execute(); + } catch (ExitCodeException ee) { + LOG.info("Shell Command exit with a non-zero exit code. This is" + + " expected as we are killing the subprocesses of the" + + " task intentionally. " + ee); + } catch (IOException ioe) { + LOG.info("Error executing shell command " + ioe); + } finally { + LOG.info("Exit code: " + shexec.getExitCode()); + } + } + } + + private String getRogueTaskPID() { + File f = new File(pidFile); + while (!f.exists()) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + break; + } + } + + // read from pidFile + return getPidFromPidFile(pidFile); + } + + @Before + public void setup() throws IOException { + FileContext.getLocalFSFileContext().delete( + new Path(TEST_ROOT_DIR.getAbsolutePath()), true); + } + + @Test + public void testProcessTree() throws Exception { + + try { + if (!ProcfsBasedProcessTree.isAvailable()) { + System.out + .println("ProcfsBasedProcessTree is not available on this system. Not testing"); + return; + } + } catch (Exception e) { + LOG.info(StringUtils.stringifyException(e)); + return; + } + // create shell script + Random rm = new Random(); + File tempFile = + new File(TEST_ROOT_DIR, getClass().getName() + "_shellScript_" + + rm.nextInt() + ".sh"); + tempFile.deleteOnExit(); + shellScript = TEST_ROOT_DIR + File.separator + tempFile.getName(); + + // create pid file + tempFile = + new File(TEST_ROOT_DIR, getClass().getName() + "_pidFile_" + + rm.nextInt() + ".pid"); + tempFile.deleteOnExit(); + pidFile = TEST_ROOT_DIR + File.separator + tempFile.getName(); + + lowestDescendant = TEST_ROOT_DIR + File.separator + "lowestDescendantPidFile"; + + // write to shell-script + try { + FileWriter fWriter = new FileWriter(shellScript); + fWriter.write( + "# rogue task\n" + + "sleep 1\n" + + "echo hello\n" + + "if [ $1 -ne 0 ]\n" + + "then\n" + + " sh " + shellScript + " $(($1-1))\n" + + "else\n" + + " echo $$ > " + lowestDescendant + "\n" + + " while true\n do\n" + + " sleep 5\n" + + " done\n" + + "fi"); + fWriter.close(); + } catch (IOException ioe) { + LOG.info("Error: " + ioe); + return; + } + + Thread t = new RogueTaskThread(); + t.start(); + String pid = getRogueTaskPID(); + LOG.info("Root process pid: " + pid); + ProcfsBasedProcessTree p = createProcessTree(pid); + p = p.getProcessTree(); // initialize + LOG.info("ProcessTree: " + p.toString()); + + File leaf = new File(lowestDescendant); + //wait till lowest descendant process of Rougue Task starts execution + while (!leaf.exists()) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + break; + } + } + + p = p.getProcessTree(); // reconstruct + LOG.info("ProcessTree: " + p.toString()); + + // Get the process-tree dump + String processTreeDump = p.getProcessTreeDump(); + + // destroy the process and all its subprocesses + destroyProcessTree(pid); + + if (isSetsidAvailable()) { // whole processtree should be gone + Assert.assertFalse("Proceesses in process group live", + isAnyProcessInTreeAlive(p)); + } else {// process should be gone + Assert.assertFalse("ProcessTree must have been gone", isAlive(pid)); + } + + LOG.info("Process-tree dump follows: \n" + processTreeDump); + Assert.assertTrue("Process-tree dump doesn't start with a proper header", + processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " + + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " + + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n")); + for (int i = N; i >= 0; i--) { + String cmdLineDump = "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\)" + + " [0-9]+ [0-9]+ [0-9]+ [0-9]+ sh " + shellScript + " " + i; + Pattern pat = Pattern.compile(cmdLineDump); + Matcher mat = pat.matcher(processTreeDump); + Assert.assertTrue("Process-tree dump doesn't contain the cmdLineDump of " + i + + "th process!", mat.find()); + } + + // Not able to join thread sometimes when forking with large N. + try { + t.join(2000); + LOG.info("RogueTaskThread successfully joined."); + } catch (InterruptedException ie) { + LOG.info("Interrupted while joining RogueTaskThread."); + } + + // ProcessTree is gone now. Any further calls should be sane. + p = p.getProcessTree(); + Assert.assertFalse("ProcessTree must have been gone", isAlive(pid)); + Assert.assertTrue("Cumulative vmem for the gone-process is " + + p.getCumulativeVmem() + " . It should be zero.", p + .getCumulativeVmem() == 0); + Assert.assertTrue(p.toString().equals("[ ]")); + } + + protected ProcfsBasedProcessTree createProcessTree(String pid) { + return new ProcfsBasedProcessTree(pid, + isSetsidAvailable()); + } + + protected ProcfsBasedProcessTree createProcessTree(String pid, + boolean setsidUsed, String procfsRootDir) { + return new ProcfsBasedProcessTree(pid, setsidUsed, procfsRootDir); + } + + protected void destroyProcessTree(String pid) throws IOException { + sendSignal(pid, 9); + } + + /** + * Get PID from a pid-file. + * + * @param pidFileName + * Name of the pid-file. + * @return the PID string read from the pid-file. Returns null if the + * pidFileName points to a non-existing file or if read fails from the + * file. + */ + public static String getPidFromPidFile(String pidFileName) { + BufferedReader pidFile = null; + FileReader fReader = null; + String pid = null; + + try { + fReader = new FileReader(pidFileName); + pidFile = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + LOG.debug("PidFile doesn't exist : " + pidFileName); + return pid; + } + + try { + pid = pidFile.readLine(); + } catch (IOException i) { + LOG.error("Failed to read from " + pidFileName); + } finally { + try { + if (fReader != null) { + fReader.close(); + } + try { + if (pidFile != null) { + pidFile.close(); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + pidFile); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + return pid; + } + + public static class ProcessStatInfo { + // sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624 + // 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640 + // 134590050 3220521392 3220520036 10975138 0 0 4096 134234626 + // 4294967295 0 0 17 1 0 0 + String pid; + String name; + String ppid; + String pgrpId; + String session; + String vmem = "0"; + String rssmemPage = "0"; + String utime = "0"; + String stime = "0"; + + public ProcessStatInfo(String[] statEntries) { + pid = statEntries[0]; + name = statEntries[1]; + ppid = statEntries[2]; + pgrpId = statEntries[3]; + session = statEntries[4]; + vmem = statEntries[5]; + if (statEntries.length > 6) { + rssmemPage = statEntries[6]; + } + if (statEntries.length > 7) { + utime = statEntries[7]; + stime = statEntries[8]; + } + } + + // construct a line that mimics the procfs stat file. + // all unused numerical entries are set to 0. + public String getStatLine() { + return String.format("%s (%s) S %s %s %s 0 0 0" + + " 0 0 0 0 %s %s 0 0 0 0 0 0 0 %s %s 0 0" + + " 0 0 0 0 0 0 0 0" + + " 0 0 0 0 0", + pid, name, ppid, pgrpId, session, + utime, stime, vmem, rssmemPage); + } + } + + /** + * A basic test that creates a few process directories and writes + * stat files. Verifies that the cpu time and memory is correctly + * computed. + * @throws IOException if there was a problem setting up the + * fake procfs directories or files. + */ + @Test + public void testCpuAndMemoryForProcessTree() throws IOException { + + // test processes + String[] pids = { "100", "200", "300", "400" }; + // create the fake procfs root directory. + File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); + + try { + setupProcfsRootDir(procfsRootDir); + setupPidDirs(procfsRootDir, pids); + + // create stat objects. + // assuming processes 100, 200, 300 are in tree and 400 is not. + ProcessStatInfo[] procInfos = new ProcessStatInfo[4]; + procInfos[0] = new ProcessStatInfo(new String[] + {"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"}); + procInfos[1] = new ProcessStatInfo(new String[] + {"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"}); + procInfos[2] = new ProcessStatInfo(new String[] + {"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"}); + procInfos[3] = new ProcessStatInfo(new String[] + {"400", "proc4", "1", "400", "400", "400000", "400", "4000", "800"}); + + writeStatFiles(procfsRootDir, pids, procInfos); + + // crank up the process tree class. + ProcfsBasedProcessTree processTree = + createProcessTree("100", true, procfsRootDir.getAbsolutePath()); + // build the process tree. + processTree.getProcessTree(); + + // verify cumulative memory + Assert.assertEquals("Cumulative virtual memory does not match", 600000L, + processTree.getCumulativeVmem()); + + // verify rss memory + long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ? + 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; + Assert.assertEquals("Cumulative rss memory does not match", + cumuRssMem, processTree.getCumulativeRssmem()); + + // verify cumulative cpu time + long cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ? + 7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L; + Assert.assertEquals("Cumulative cpu time does not match", + cumuCpuTime, processTree.getCumulativeCpuTime()); + + // test the cpu time again to see if it cumulates + procInfos[0] = new ProcessStatInfo(new String[] + {"100", "proc1", "1", "100", "100", "100000", "100", "2000", "300"}); + procInfos[1] = new ProcessStatInfo(new String[] + {"200", "proc2", "100", "100", "100", "200000", "200", "3000", "500"}); + writeStatFiles(procfsRootDir, pids, procInfos); + + // build the process tree. + processTree.getProcessTree(); + + // verify cumulative cpu time again + cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ? + 9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L; + Assert.assertEquals("Cumulative cpu time does not match", + cumuCpuTime, processTree.getCumulativeCpuTime()); + } finally { + FileUtil.fullyDelete(procfsRootDir); + } + } + + /** + * Tests that cumulative memory is computed only for + * processes older than a given age. + * @throws IOException if there was a problem setting up the + * fake procfs directories or files. + */ + @Test + public void testMemForOlderProcesses() throws IOException { + // initial list of processes + String[] pids = { "100", "200", "300", "400" }; + // create the fake procfs root directory. + File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); + + try { + setupProcfsRootDir(procfsRootDir); + setupPidDirs(procfsRootDir, pids); + + // create stat objects. + // assuming 100, 200 and 400 are in tree, 300 is not. + ProcessStatInfo[] procInfos = new ProcessStatInfo[4]; + procInfos[0] = new ProcessStatInfo(new String[] + {"100", "proc1", "1", "100", "100", "100000", "100"}); + procInfos[1] = new ProcessStatInfo(new String[] + {"200", "proc2", "100", "100", "100", "200000", "200"}); + procInfos[2] = new ProcessStatInfo(new String[] + {"300", "proc3", "1", "300", "300", "300000", "300"}); + procInfos[3] = new ProcessStatInfo(new String[] + {"400", "proc4", "100", "100", "100", "400000", "400"}); + + writeStatFiles(procfsRootDir, pids, procInfos); + + // crank up the process tree class. + ProcfsBasedProcessTree processTree = + createProcessTree("100", true, procfsRootDir.getAbsolutePath()); + // build the process tree. + processTree.getProcessTree(); + + // verify cumulative memory + Assert.assertEquals("Cumulative memory does not match", + 700000L, processTree.getCumulativeVmem()); + + // write one more process as child of 100. + String[] newPids = { "500" }; + setupPidDirs(procfsRootDir, newPids); + + ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1]; + newProcInfos[0] = new ProcessStatInfo(new String[] + {"500", "proc5", "100", "100", "100", "500000", "500"}); + writeStatFiles(procfsRootDir, newPids, newProcInfos); + + // check memory includes the new process. + processTree.getProcessTree(); + Assert.assertEquals("Cumulative vmem does not include new process", + 1200000L, processTree.getCumulativeVmem()); + long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ? + 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; + Assert.assertEquals("Cumulative rssmem does not include new process", + cumuRssMem, processTree.getCumulativeRssmem()); + + // however processes older than 1 iteration will retain the older value + Assert.assertEquals("Cumulative vmem shouldn't have included new process", + 700000L, processTree.getCumulativeVmem(1)); + cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ? + 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; + Assert.assertEquals("Cumulative rssmem shouldn't have included new process", + cumuRssMem, processTree.getCumulativeRssmem(1)); + + // one more process + newPids = new String[]{ "600" }; + setupPidDirs(procfsRootDir, newPids); + + newProcInfos = new ProcessStatInfo[1]; + newProcInfos[0] = new ProcessStatInfo(new String[] + {"600", "proc6", "100", "100", "100", "600000", "600"}); + writeStatFiles(procfsRootDir, newPids, newProcInfos); + + // refresh process tree + processTree.getProcessTree(); + + // processes older than 2 iterations should be same as before. + Assert.assertEquals("Cumulative vmem shouldn't have included new processes", + 700000L, processTree.getCumulativeVmem(2)); + cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ? + 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; + Assert.assertEquals("Cumulative rssmem shouldn't have included new processes", + cumuRssMem, processTree.getCumulativeRssmem(2)); + + // processes older than 1 iteration should not include new process, + // but include process 500 + Assert.assertEquals("Cumulative vmem shouldn't have included new processes", + 1200000L, processTree.getCumulativeVmem(1)); + cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ? + 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; + Assert.assertEquals("Cumulative rssmem shouldn't have included new processes", + cumuRssMem, processTree.getCumulativeRssmem(1)); + + // no processes older than 3 iterations, this should be 0 + Assert.assertEquals("Getting non-zero vmem for processes older than 3 iterations", + 0L, processTree.getCumulativeVmem(3)); + Assert.assertEquals("Getting non-zero rssmem for processes older than 3 iterations", + 0L, processTree.getCumulativeRssmem(3)); + } finally { + FileUtil.fullyDelete(procfsRootDir); + } + } + + /** + * Verifies ProcfsBasedProcessTree.checkPidPgrpidForMatch() in case of + * 'constructProcessInfo() returning null' by not writing stat file for the + * mock process + * @throws IOException if there was a problem setting up the + * fake procfs directories or files. + */ + @Test + public void testDestroyProcessTree() throws IOException { + // test process + String pid = "100"; + // create the fake procfs root directory. + File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); + + try { + setupProcfsRootDir(procfsRootDir); + + // crank up the process tree class. + ProcfsBasedProcessTree processTree = + createProcessTree(pid, true, procfsRootDir.getAbsolutePath()); + + // Let us not create stat file for pid 100. + Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch( + Integer.valueOf(pid), procfsRootDir.getAbsolutePath())); + } finally { + FileUtil.fullyDelete(procfsRootDir); + } + } + + /** + * Test the correctness of process-tree dump. + * + * @throws IOException + */ + @Test + public void testProcessTreeDump() + throws IOException { + + String[] pids = { "100", "200", "300", "400", "500", "600" }; + + File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); + + try { + setupProcfsRootDir(procfsRootDir); + setupPidDirs(procfsRootDir, pids); + + int numProcesses = pids.length; + // Processes 200, 300, 400 and 500 are descendants of 100. 600 is not. + ProcessStatInfo[] procInfos = new ProcessStatInfo[numProcesses]; + procInfos[0] = new ProcessStatInfo(new String[] { + "100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"}); + procInfos[1] = new ProcessStatInfo(new String[] { + "200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"}); + procInfos[2] = new ProcessStatInfo(new String[] { + "300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"}); + procInfos[3] = new ProcessStatInfo(new String[] { + "400", "proc4", "200", "100", "100", "400000", "400", "4000", "800"}); + procInfos[4] = new ProcessStatInfo(new String[] { + "500", "proc5", "400", "100", "100", "400000", "400", "4000", "800"}); + procInfos[5] = new ProcessStatInfo(new String[] { + "600", "proc6", "1", "1", "1", "400000", "400", "4000", "800"}); + + String[] cmdLines = new String[numProcesses]; + cmdLines[0] = "proc1 arg1 arg2"; + cmdLines[1] = "proc2 arg3 arg4"; + cmdLines[2] = "proc3 arg5 arg6"; + cmdLines[3] = "proc4 arg7 arg8"; + cmdLines[4] = "proc5 arg9 arg10"; + cmdLines[5] = "proc6 arg11 arg12"; + + writeStatFiles(procfsRootDir, pids, procInfos); + writeCmdLineFiles(procfsRootDir, pids, cmdLines); + + ProcfsBasedProcessTree processTree = createProcessTree( + "100", true, procfsRootDir.getAbsolutePath()); + // build the process tree. + processTree.getProcessTree(); + + // Get the process-tree dump + String processTreeDump = processTree.getProcessTreeDump(); + + LOG.info("Process-tree dump follows: \n" + processTreeDump); + Assert.assertTrue("Process-tree dump doesn't start with a proper header", + processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " + + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " + + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n")); + for (int i = 0; i < 5; i++) { + ProcessStatInfo p = procInfos[i]; + Assert.assertTrue( + "Process-tree dump doesn't contain the cmdLineDump of process " + + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + + p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name + + ") " + p.utime + " " + p.stime + " " + p.vmem + " " + + p.rssmemPage + " " + cmdLines[i])); + } + + // 600 should not be in the dump + ProcessStatInfo p = procInfos[5]; + Assert.assertFalse( + "Process-tree dump shouldn't contain the cmdLineDump of process " + + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid + + " " + p.pgrpId + " " + p.session + " (" + p.name + ") " + + p.utime + " " + p.stime + " " + p.vmem + " " + cmdLines[5])); + } finally { + FileUtil.fullyDelete(procfsRootDir); + } + } + + protected static boolean isSetsidAvailable() { + ShellCommandExecutor shexec = null; + boolean setsidSupported = true; + try { + String[] args = {"setsid", "bash", "-c", "echo $$"}; + shexec = new ShellCommandExecutor(args); + shexec.execute(); + } catch (IOException ioe) { + LOG.warn("setsid is not available on this machine. So not using it."); + setsidSupported = false; + } finally { // handle the exit code + LOG.info("setsid exited with exit code " + shexec.getExitCode()); + } + return setsidSupported; + } + + /** + * Is the root-process alive? + * Used only in tests. + * @return true if the root-process is alive, false otherwise. + */ + private static boolean isAlive(String pid) { + try { + final String sigpid = isSetsidAvailable() ? "-" + pid : pid; + try { + sendSignal(sigpid, 0); + } catch (ExitCodeException e) { + return false; + } + return true; + } catch (IOException ignored) { + } + return false; + } + + private static void sendSignal(String pid, int signal) throws IOException { + ShellCommandExecutor shexec = null; + String[] arg = { "kill", "-" + signal, pid }; + shexec = new ShellCommandExecutor(arg); + shexec.execute(); + } + + /** + * Is any of the subprocesses in the process-tree alive? + * Used only in tests. + * @return true if any of the processes in the process-tree is + * alive, false otherwise. + */ + private static boolean isAnyProcessInTreeAlive( + ProcfsBasedProcessTree processTree) { + for (Integer pId : processTree.getCurrentProcessIDs()) { + if (isAlive(pId.toString())) { + return true; + } + } + return false; + } + + /** + * Create a directory to mimic the procfs file system's root. + * @param procfsRootDir root directory to create. + * @throws IOException if could not delete the procfs root directory + */ + public static void setupProcfsRootDir(File procfsRootDir) + throws IOException { + // cleanup any existing process root dir. + if (procfsRootDir.exists()) { + Assert.assertTrue(FileUtil.fullyDelete(procfsRootDir)); + } + + // create afresh + Assert.assertTrue(procfsRootDir.mkdirs()); + } + + /** + * Create PID directories under the specified procfs root directory + * @param procfsRootDir root directory of procfs file system + * @param pids the PID directories to create. + * @throws IOException If PID dirs could not be created + */ + public static void setupPidDirs(File procfsRootDir, String[] pids) + throws IOException { + for (String pid : pids) { + File pidDir = new File(procfsRootDir, pid); + pidDir.mkdir(); + if (!pidDir.exists()) { + throw new IOException ("couldn't make process directory under " + + "fake procfs"); + } else { + LOG.info("created pid dir"); + } + } + } + + /** + * Write stat files under the specified pid directories with data + * setup in the corresponding ProcessStatInfo objects + * @param procfsRootDir root directory of procfs file system + * @param pids the PID directories under which to create the stat file + * @param procs corresponding ProcessStatInfo objects whose data should be + * written to the stat files. + * @throws IOException if stat files could not be written + */ + public static void writeStatFiles(File procfsRootDir, String[] pids, + ProcessStatInfo[] procs) throws IOException { + for (int i=0; i0.0.0.0:45454 -yarn.server.nodemanager.connect.rmtrue - + yarn.server.nodemanager.healthchecker.script.path @@ -130,6 +129,26 @@ Arguments to be passed to the health-check script run by the NodeManager + + + + + + yarn.server.nodemanager.containers-monitor.monitoring-interval + 3000 + + + + yarn.server.nodemanager.containers-monitor.resourcecalculatorplugin + + + + + yarn.server.nodemanager.reserved-physical-memory.mb + -1 + + + Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml Tue Apr 12 07:56:07 2011 @@ -1,5 +1,10 @@ + + + 1.0-SNAPSHOT + + yarn-server org.apache.hadoop Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Tue Apr 12 07:56:07 2011 @@ -50,7 +50,9 @@ public class NMConfig { public static final String DEFAULT_NM_LOG_DIR = "/tmp/logs"; - public static final String NM_RESOURCE = NM_PREFIX + "resource.memory.gb"; + public static final int DEFAULT_NM_VMEM_GB = 8; + + public static final String NM_VMEM_GB = NM_PREFIX + "resource.memory.gb"; // TODO: Should this instead be dictated by RM? public static final String HEARTBEAT_INTERVAL = NM_PREFIX Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Apr 12 07:56:07 2011 @@ -94,7 +94,7 @@ public class NodeStatusUpdaterImpl exten this.heartBeatInterval = conf.getLong(NMConfig.HEARTBEAT_INTERVAL, NMConfig.DEFAULT_HEARTBEAT_INTERVAL); - int memory = conf.getInt(NMConfig.NM_RESOURCE, 8); + int memory = conf.getInt(NMConfig.NM_VMEM_GB, 8); this.totalResource = recordFactory.newRecordInstance(Resource.class); this.totalResource.setMemory(memory * 1024); super.init(conf); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Apr 12 07:56:07 2011 @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; @@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; @@ -130,7 +132,8 @@ public class ContainerManagerImpl extend auxiluaryServices.register(this); addService(auxiluaryServices); - ContainersMonitor containersMonitor = new ContainersMonitor(); + ContainersMonitor containersMonitor = + new ContainersMonitorImpl(exec, dispatcher); addService(containersMonitor); dispatcher.register(ContainerEventType.class, @@ -247,6 +250,9 @@ public class ContainerManagerImpl extend // + containerID + " on this NodeManager!!"); } dispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(containerID, + "Container killed by the application.")); + dispatcher.getEventHandler().handle( new ContainerEvent(containerID, ContainerEventType.KILL_CONTAINER)); // TODO: Move this code to appropriate place once kill_container is @@ -324,6 +330,9 @@ public class ContainerManagerImpl extend for (org.apache.hadoop.yarn.api.records.Container container : containersFinishedEvent .getContainersToCleanup()) { this.dispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(container.getId(), + "Container Killed by ResourceManager")); + this.dispatcher.getEventHandler().handle( new ContainerEvent(container.getId(), ContainerEventType.KILL_CONTAINER)); } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Tue Apr 12 07:56:07 2011 @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -318,6 +319,9 @@ public class ApplicationImpl implements // application. for (ContainerId containerID : app.containers.keySet()) { app.dispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(containerID, + "Container killed on application-finish.")); + app.dispatcher.getEventHandler().handle( new ContainerEvent(containerID, ContainerEventType.KILL_CONTAINER)); } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java?rev=1091316&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java Tue Apr 12 07:56:07 2011 @@ -0,0 +1,17 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ContainerDiagnosticsUpdateEvent extends ContainerEvent { + + private final String diagnosticsUpdate; + + public ContainerDiagnosticsUpdateEvent(ContainerId cID, String update) { + super(cID, ContainerEventType.UPDATE_DIAGNOSTICS_MSG); + this.diagnosticsUpdate = update; + } + + public String getDiagnosticsUpdate() { + return this.diagnosticsUpdate; + } +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java Tue Apr 12 07:56:07 2011 @@ -23,6 +23,7 @@ public enum ContainerEventType { // Producer: ContainerManager INIT_CONTAINER, KILL_CONTAINER, + UPDATE_DIAGNOSTICS_MSG, CONTAINER_DONE, // DownloadManager Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Apr 12 07:56:07 2011 @@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; @@ -49,6 +49,7 @@ public class ContainerImpl implements Co private final Dispatcher dispatcher; private final ContainerLaunchContext launchContext; private int exitCode; + private final StringBuilder diagnostics; private static final Log LOG = LogFactory.getLog(Container.class); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -57,6 +58,7 @@ public class ContainerImpl implements Co ContainerLaunchContext launchContext) { this.dispatcher = dispatcher; this.launchContext = launchContext; + this.diagnostics = new StringBuilder(); stateMachine = stateMachineFactory.make(this); } @@ -64,6 +66,9 @@ public class ContainerImpl implements Co private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION = new ContainerDoneTransition(); + private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = + new ContainerDiagnosticsUpdateTransition(); + // State Machine for each container. private static StateMachineFactory @@ -72,6 +77,9 @@ public class ContainerImpl implements Co // From NEW State .addTransition(ContainerState.NEW, ContainerState.LOCALIZING, ContainerEventType.INIT_CONTAINER) + .addTransition(ContainerState.NEW, ContainerState.NEW, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.NEW, ContainerState.DONE, ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION) @@ -80,6 +88,9 @@ public class ContainerImpl implements Co ContainerState.LOCALIZED, ContainerEventType.CONTAINER_RESOURCES_LOCALIZED, new LocalizedTransition()) + .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.LOCALIZING, ContainerState.CONTAINER_RESOURCES_CLEANINGUP, ContainerEventType.KILL_CONTAINER, @@ -91,6 +102,9 @@ public class ContainerImpl implements Co .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition()) + .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) // From RUNNING State .addTransition(ContainerState.RUNNING, @@ -101,6 +115,9 @@ public class ContainerImpl implements Co ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) @@ -109,6 +126,10 @@ public class ContainerImpl implements Co ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, CONTAINER_DONE_TRANSITION) .addTransition(ContainerState.EXITED_WITH_SUCCESS, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS, ContainerEventType.KILL_CONTAINER) @@ -117,6 +138,10 @@ public class ContainerImpl implements Co ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, CONTAINER_DONE_TRANSITION) .addTransition(ContainerState.EXITED_WITH_FAILURE, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.KILL_CONTAINER) @@ -125,6 +150,9 @@ public class ContainerImpl implements Co ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new ContainerKilledTransition()) + .addTransition(ContainerState.KILLING, ContainerState.KILLING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.KILLING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER) .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS, @@ -139,10 +167,17 @@ public class ContainerImpl implements Co ContainerState.DONE, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, CONTAINER_DONE_TRANSITION) + .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, + ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION) + .addTransition(ContainerState.DONE, ContainerState.DONE, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) // create the topology tables .installTopology(); @@ -203,7 +238,8 @@ public class ContainerImpl implements Co ContainerStatus containerStatus = recordFactory.newRecordInstance(ContainerStatus.class); containerStatus.setState(getCurrentState()); containerStatus.setContainerId(this.launchContext.getContainerId()); - containerStatus.setExitStatus(exitCode); + containerStatus.setDiagnostics(diagnostics.toString()); + containerStatus.setExitStatus(String.valueOf(exitCode)); return containerStatus; } @@ -244,9 +280,12 @@ public class ContainerImpl implements Co public void transition(ContainerImpl container, ContainerEvent event) { // Inform the ContainersMonitor to start monitoring the container's // resource usage. + // TODO: Fix pmem limits below + long vmemBytes = + container.getLaunchContext().getResource().getMemory() * 1024 * 1024L; container.dispatcher.getEventHandler().handle( - new ContainersMonitorEvent( - ContainersMonitorEventType.START_MONITORING_CONTAINER)); + new ContainerStartMonitoringEvent(container.getContainerID(), + vmemBytes, -1)); } } @@ -326,6 +365,20 @@ public class ContainerImpl implements Co // Inform the application container.dispatcher.getEventHandler().handle( new ApplicationContainerFinishedEvent(container.getContainerID())); + // Remove the container from the resource-monitor + container.dispatcher.getEventHandler().handle( + new ContainerStopMonitoringEvent(container.getContainerID())); + } + } + + static class ContainerDiagnosticsUpdateTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + ContainerDiagnosticsUpdateEvent updateEvent = + (ContainerDiagnosticsUpdateEvent) event; + container.diagnostics.append(updateEvent.getDiagnosticsUpdate()) + .append("\n"); } } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java?rev=1091316&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java Tue Apr 12 07:56:07 2011 @@ -0,0 +1,25 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ContainerStartMonitoringEvent extends ContainersMonitorEvent { + + private final long vmemLimit; + private final long pmemLimit; + + public ContainerStartMonitoringEvent(ContainerId containerId, + long vmemLimit, long pmemLimit) { + super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER); + this.vmemLimit = vmemLimit; + this.pmemLimit = pmemLimit; + } + + public long getVmemLimit() { + return this.vmemLimit; + } + + public long getPmemLimit() { + return this.pmemLimit; + } + +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java?rev=1091316&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java Tue Apr 12 07:56:07 2011 @@ -0,0 +1,11 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ContainerStopMonitoringEvent extends ContainersMonitorEvent { + + public ContainerStopMonitoringEvent(ContainerId containerId) { + super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER); + } + +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java Tue Apr 12 07:56:07 2011 @@ -19,18 +19,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.service.Service; -public class ContainersMonitor extends AbstractService - implements EventHandler { - - public ContainersMonitor() { - super("containers-monitor"); - } - - @Override - public void handle(ContainersMonitorEvent monitorEvent) { - // TODO - } +public interface ContainersMonitor extends Service, + EventHandler { } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java Tue Apr 12 07:56:07 2011 @@ -18,14 +18,22 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.Event; public class ContainersMonitorEvent extends AbstractEvent { - public ContainersMonitorEvent(ContainersMonitorEventType eventType) { + private final ContainerId containerId; + + public ContainersMonitorEvent(ContainerId containerId, + ContainersMonitorEventType eventType) { super(eventType); + this.containerId = containerId; + } + + public ContainerId getContainerId() { + return this.containerId; } } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1091316&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Tue Apr 12 07:56:07 2011 @@ -0,0 +1,495 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.NMConfig; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; + +public class ContainersMonitorImpl extends AbstractService implements + ContainersMonitor { + + final static Log LOG = LogFactory + .getLog(ContainersMonitorImpl.class); + + private final static String MONITORING_INTERVAL_CONFIG_KEY = + NMConfig.NM_PREFIX + "containers-monitor.monitoring-interval"; + public static final String RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY = + NMConfig.NM_PREFIX + "containers-monitor.resourcecalculatorplugin"; + public static final String NM_RESERVED_PHYSICALMEMORY_MB = + NMConfig.NM_PREFIX + "reserved-physical-memory.mb"; + + private final static int MONITORING_INTERVAL_DEFAULT = 3000; + private long monitoringInterval; + private MonitoringThread monitoringThread; + + List containersToBeRemoved; + Map containersToBeAdded; + Map trackingContainers = + new HashMap(); + + final ContainerExecutor containerExecutor; + private final Dispatcher eventDispatcher; + private ResourceCalculatorPlugin resourceCalculatorPlugin; + + private long maxVmemAllottedForContainers = DISABLED_MEMORY_LIMIT; + private long maxPmemAllottedForContainers = DISABLED_MEMORY_LIMIT; + + /** + * A value which if set for memory related configuration options, indicates + * that the options are turned off. + */ + public static final long DISABLED_MEMORY_LIMIT = -1L; + + private static final String MEMORY_USAGE_STRING = + "Memory usage of ProcessTree %s for container-id %s : Virtual %d bytes, " + + + "limit : %d bytes; Physical %d bytes, limit %d bytes"; + + public ContainersMonitorImpl(ContainerExecutor exec, + AsyncDispatcher dispatcher) { + super("containers-monitor"); + + this.containerExecutor = exec; + this.eventDispatcher = dispatcher; + + this.containersToBeAdded = new HashMap(); + this.containersToBeRemoved = new ArrayList(); + this.monitoringThread = new MonitoringThread(); + } + + @Override + public synchronized void init(Configuration conf) { + this.monitoringInterval = + conf.getLong(MONITORING_INTERVAL_CONFIG_KEY, + MONITORING_INTERVAL_DEFAULT); + + Class clazz = + conf.getClass(RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY, null, + ResourceCalculatorPlugin.class); + this.resourceCalculatorPlugin = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); + LOG.info(" Using ResourceCalculatorPlugin : " + + this.resourceCalculatorPlugin); + + long totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT; + if (this.resourceCalculatorPlugin != null) { + totalPhysicalMemoryOnNM = + this.resourceCalculatorPlugin.getPhysicalMemorySize(); + if (totalPhysicalMemoryOnNM <= 0) { + LOG.warn("NodeManager's totalPmem could not be calculated. " + + "Setting it to " + DISABLED_MEMORY_LIMIT); + totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT; + } + } + + // ///////// Virtual memory configuration ////// + this.maxVmemAllottedForContainers = + conf.getLong(NMConfig.NM_VMEM_GB, NMConfig.DEFAULT_NM_VMEM_GB); + this.maxVmemAllottedForContainers = + this.maxVmemAllottedForContainers * 1024 * 1024 * 1024L; //Normalize + + if (this.maxVmemAllottedForContainers > totalPhysicalMemoryOnNM) { + LOG.info("totalMemoryAllottedForContainers > totalPhysicalMemoryOnNM." + + " Thrashing might happen."); + } + + // ///////// Physical memory configuration ////// + long reservedPmemOnNM = + conf.getLong(NM_RESERVED_PHYSICALMEMORY_MB, DISABLED_MEMORY_LIMIT); + reservedPmemOnNM = + reservedPmemOnNM == DISABLED_MEMORY_LIMIT + ? DISABLED_MEMORY_LIMIT + : reservedPmemOnNM * 1024 * 1024; // normalize to bytes + + if (reservedPmemOnNM == DISABLED_MEMORY_LIMIT + || totalPhysicalMemoryOnNM == DISABLED_MEMORY_LIMIT) { + this.maxPmemAllottedForContainers = DISABLED_MEMORY_LIMIT; + } else { + this.maxPmemAllottedForContainers = + totalPhysicalMemoryOnNM - reservedPmemOnNM; + } + + super.init(conf); + } + + /** + * Is the total physical memory check enabled? + * + * @return true if total physical memory check is enabled. + */ + boolean doCheckPhysicalMemory() { + return !(this.maxPmemAllottedForContainers == DISABLED_MEMORY_LIMIT); + } + + /** + * Is the total virtual memory check enabled? + * + * @return true if total virtual memory check is enabled. + */ + boolean doCheckVirtualMemory() { + return !(this.maxVmemAllottedForContainers == DISABLED_MEMORY_LIMIT); + } + + @Override + public synchronized void start() { + this.monitoringThread.start(); + super.start(); + } + + @Override + public synchronized void stop() { + this.monitoringThread.interrupt(); + try { + this.monitoringThread.join(); + } catch (InterruptedException e) { + ; + } + super.stop(); + } + + private static class ProcessTreeInfo { + private ContainerId containerId; + private String pid; + private ProcfsBasedProcessTree pTree; + private long vmemLimit; + private long pmemLimit; + + public ProcessTreeInfo(ContainerId containerId, String pid, + ProcfsBasedProcessTree pTree, long vmemLimit, long pmemLimit) { + this.containerId = containerId; + this.pid = pid; + this.pTree = pTree; + this.vmemLimit = vmemLimit; + this.pmemLimit = pmemLimit; + } + + public ContainerId getContainerId() { + return this.containerId; + } + + public String getPID() { + return this.pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public ProcfsBasedProcessTree getProcessTree() { + return this.pTree; + } + + public void setProcessTree(ProcfsBasedProcessTree pTree) { + this.pTree = pTree; + } + + public long getVmemLimit() { + return this.vmemLimit; + } + + /** + * @return Physical memory limit for the process tree in bytes + */ + public long getPmemLimit() { + return this.pmemLimit; + } + } + + + /** + * Check whether a container's process tree's current memory usage is over + * limit. + * + * When a java process exec's a program, it could momentarily account for + * double the size of it's memory, because the JVM does a fork()+exec() + * which at fork time creates a copy of the parent's memory. If the + * monitoring thread detects the memory used by the container tree at the + * same instance, it could assume it is over limit and kill the tree, for no + * fault of the process itself. + * + * We counter this problem by employing a heuristic check: - if a process + * tree exceeds the memory limit by more than twice, it is killed + * immediately - if a process tree has processes older than the monitoring + * interval exceeding the memory limit by even 1 time, it is killed. Else it + * is given the benefit of doubt to lie around for one more iteration. + * + * @param containerId + * Container Id for the container tree + * @param currentMemUsage + * Memory usage of a container tree + * @param curMemUsageOfAgedProcesses + * Memory usage of processes older than an iteration in a container + * tree + * @param vmemLimit + * The limit specified for the container + * @return true if the memory usage is more than twice the specified limit, + * or if processes in the tree, older than this thread's monitoring + * interval, exceed the memory limit. False, otherwise. + */ + boolean isProcessTreeOverLimit(String containerId, + long currentMemUsage, + long curMemUsageOfAgedProcesses, + long vmemLimit) { + boolean isOverLimit = false; + + if (currentMemUsage > (2 * vmemLimit)) { + LOG.warn("Process tree for container: " + containerId + + " running over twice " + "the configured limit. Limit=" + vmemLimit + + ", current usage = " + currentMemUsage); + isOverLimit = true; + } else if (curMemUsageOfAgedProcesses > vmemLimit) { + LOG.warn("Process tree for container: " + containerId + + " has processes older than 1 " + + "iteration running over the configured limit. Limit=" + vmemLimit + + ", current usage = " + curMemUsageOfAgedProcesses); + isOverLimit = true; + } + + return isOverLimit; + } + + // method provided just for easy testing purposes + boolean isProcessTreeOverLimit(ProcfsBasedProcessTree pTree, + String containerId, long limit) { + long currentMemUsage = pTree.getCumulativeVmem(); + // as processes begin with an age 1, we want to see if there are processes + // more than 1 iteration old. + long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1); + return isProcessTreeOverLimit(containerId, currentMemUsage, + curMemUsageOfAgedProcesses, limit); + } + + private class MonitoringThread extends Thread { + public MonitoringThread() { + + } + + @Override + public void run() { + + if (!(doCheckPhysicalMemory() || doCheckVirtualMemory())) { + LOG.info("Neither virutal-memory nor physical-memory monitoring is " + + "needed. Not running the monitor-thread"); + return; + } + + while (true) { + + // Print the processTrees for debugging. + if (LOG.isDebugEnabled()) { + StringBuffer tmp = new StringBuffer("[ "); + for (ProcessTreeInfo p : trackingContainers.values()) { + tmp.append(p.getPID()); + tmp.append(" "); + } + LOG.debug("Current ProcessTree list : " + + tmp.substring(0, tmp.length()) + "]"); + } + + // Add new containers + synchronized (containersToBeAdded) { + for (Entry entry : containersToBeAdded + .entrySet()) { + ContainerId containerId = entry.getKey(); + ProcessTreeInfo processTreeInfo = entry.getValue(); + LOG.info("Starting resource-monitoring for " + containerId); + trackingContainers.put(containerId, processTreeInfo); + } + containersToBeAdded.clear(); + } + + // Remove finished containers + synchronized (containersToBeRemoved) { + for (ContainerId containerId : containersToBeRemoved) { + trackingContainers.remove(containerId); + LOG.info("Stopping resource-monitoring for " + containerId); + } + containersToBeRemoved.clear(); + } + + // Now do the monitoring for the trackingContainers + // Check memory usage and kill any overflowing containers + long vmemStillInUsage = 0; + long pmemStillInUsage = 0; + for (Iterator> it = + trackingContainers.entrySet().iterator(); it.hasNext();) { + + Map.Entry entry = it.next(); + ContainerId containerId = entry.getKey(); + ProcessTreeInfo ptInfo = entry.getValue(); + try { + String pId = ptInfo.getPID(); + + // Initialize any uninitialized processTrees + if (pId == null) { + // get pid from ContainerId + pId = containerExecutor.getProcessId(ptInfo.getContainerId()); + if (pId != null) { + // pId will be null, either if the container is not spawned yet + // or if the container's pid is removed from ContainerExecutor + LOG.debug("Tracking ProcessTree " + pId + + " for the first time"); + + ProcfsBasedProcessTree pt = + new ProcfsBasedProcessTree(pId, + ContainerExecutor.isSetsidAvailable); + ptInfo.setPid(pId); + ptInfo.setProcessTree(pt); + } + } + // End of initializing any uninitialized processTrees + + if (pId == null) { + continue; // processTree cannot be tracked + } + + LOG.debug("Constructing ProcessTree for : PID = " + pId + + " ContainerId = " + containerId); + ProcfsBasedProcessTree pTree = ptInfo.getProcessTree(); + pTree = pTree.getProcessTree(); // get the updated process-tree + ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of + // updated state + long currentVmemUsage = pTree.getCumulativeVmem(); + long currentPmemUsage = pTree.getCumulativeRssmem(); + // as processes begin with an age 1, we want to see if there + // are processes more than 1 iteration old. + long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1); + long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1); + long vmemLimit = ptInfo.getVmemLimit(); + long pmemLimit = ptInfo.getPmemLimit(); + LOG.info(String.format(MEMORY_USAGE_STRING, pId, + containerId.toString(), currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit)); + + boolean isMemoryOverLimit = false; + String msg = ""; + if (doCheckVirtualMemory() + && isProcessTreeOverLimit(containerId.toString(), + currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) { + // Container (the root process) is still alive and overflowing + // memory. + // Dump the process-tree and then clean it up. + msg = + "Container [pid=" + + pId + + ",containerID=" + + containerId + + "] is running beyond memory-limits. Current usage : " + + currentVmemUsage + + "bytes. Limit : " + + vmemLimit + + "bytes. Killing container. " + + "\nDump of the process-tree for " + containerId + + " : \n" + pTree.getProcessTreeDump(); + isMemoryOverLimit = true; + } else if (doCheckPhysicalMemory() + && isProcessTreeOverLimit(containerId.toString(), + currentPmemUsage, curRssMemUsageOfAgedProcesses, + pmemLimit)) { + // Container (the root process) is still alive and overflowing + // memory. + // Dump the process-tree and then clean it up. + msg = + "Container [pid=" + + pId + + ",tipID=" + + containerId + + "] is running beyond physical memory-limits." + + " Current usage : " + + currentPmemUsage + + "bytes. Limit : " + + pmemLimit + + "bytes. Killing container. \nDump of the process-tree for " + + containerId + " : \n" + pTree.getProcessTreeDump(); + isMemoryOverLimit = true; + } + + if (isMemoryOverLimit) { + // Virtual or physical memory over limit. Fail the container and + // remove + // the corresponding process tree + LOG.warn(msg); + // warn if not a leader + if (!pTree.checkPidPgrpidForMatch()) { + LOG.error("Killed container process with PID " + pId + + " but it is not a process group leader."); + } + eventDispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(containerId, msg)); + // kill the container + eventDispatcher.getEventHandler().handle( + new ContainerEvent(containerId, + ContainerEventType.KILL_CONTAINER)); + it.remove(); + LOG.info("Removed ProcessTree with root " + pId); + } else { + // Accounting the total memory in usage for all containers that + // are still + // alive and within limits. + vmemStillInUsage += currentVmemUsage; + pmemStillInUsage += currentPmemUsage; + } + } catch (Exception e) { + // Log the exception and proceed to the next container. + LOG.warn("Uncaught exception in ContainerMemoryManager " + + "while managing memory of " + containerId, e); + } + } + + try { + Thread.sleep(monitoringInterval); + } catch (InterruptedException e) { + LOG.warn(ContainersMonitorImpl.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + } + + @Override + public void handle(ContainersMonitorEvent monitoringEvent) { + + if (!(doCheckPhysicalMemory() || doCheckVirtualMemory())) { + return; + } + + ContainerId containerId = monitoringEvent.getContainerId(); + switch (monitoringEvent.getType()) { + case START_MONITORING_CONTAINER: + ContainerStartMonitoringEvent startEvent = + (ContainerStartMonitoringEvent) monitoringEvent; + synchronized (this.containersToBeAdded) { + ProcessTreeInfo processTreeInfo = + new ProcessTreeInfo(containerId, null, null, + startEvent.getVmemLimit(), startEvent.getPmemLimit()); + this.containersToBeAdded.put(containerId, processTreeInfo); + } + break; + case STOP_MONITORING_CONTAINER: + synchronized (this.containersToBeRemoved) { + this.containersToBeRemoved.add(containerId); + } + break; + default: + // TODO: Wrong event. + } + } +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Tue Apr 12 07:56:07 2011 @@ -124,14 +124,21 @@ public class DummyContainerManager exten } public static void waitForContainerState(ContainerManager containerManager, - ContainerId containerID, ContainerState finalState) + ContainerId containerID, ContainerState finalState) + throws InterruptedException, YarnRemoteException { + waitForContainerState(containerManager, containerID, finalState, 20); + } + + public static void waitForContainerState(ContainerManager containerManager, + ContainerId containerID, ContainerState finalState, int timeOutMax) throws InterruptedException, YarnRemoteException { GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class); request.setContainerId(containerID); ContainerStatus containerStatus = containerManager.getContainerStatus(request).getStatus(); int timeoutSecs = 0; - while (!containerStatus.getState().equals(finalState) && timeoutSecs++ < 20) { + while (!containerStatus.getState().equals(finalState) + && timeoutSecs++ < timeOutMax) { Thread.sleep(1000); LOG.info("Waiting for container to get into state " + finalState + ". Current state is " + containerStatus.getState()); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Tue Apr 12 07:56:07 2011 @@ -18,10 +18,17 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.NodeHealthCheckerService; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -44,12 +51,28 @@ public class TestEventFlow { private static final Log LOG = LogFactory.getLog(TestEventFlow.class); private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private static File localDir = new File("target", + TestEventFlow.class.getName() + "-localDir").getAbsoluteFile(); + private static File logDir = new File("target", + TestEventFlow.class.getName() + "-logDir").getAbsoluteFile(); + @Test public void testSuccessfulContainerLaunch() throws InterruptedException, - YarnRemoteException { + IOException { + + FileContext localFS = FileContext.getLocalFSFileContext(); + + localFS.delete(new Path(localDir.getAbsolutePath()), true); + localFS.delete(new Path(logDir.getAbsolutePath()), true); + localDir.mkdir(); + logDir.mkdir(); + Context context = new NMContext(); YarnConfiguration conf = new YarnConfiguration(); + conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath()); + conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath()); + ContainerExecutor exec = new DefaultContainerExecutor(); DeletionService del = new DeletionService(exec); Dispatcher dispatcher = new AsyncDispatcher(); @@ -70,7 +93,7 @@ public class TestEventFlow { DummyContainerManager containerManager = new DummyContainerManager(context, exec, del, nodeStatusUpdater); - containerManager.init(new Configuration()); + containerManager.init(conf); containerManager.start(); ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1091316&r1=1091315&r2=1091316&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Apr 12 07:56:07 2011 @@ -205,7 +205,7 @@ public class TestNodeStatusUpdater { }; YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(NMConfig.NM_RESOURCE, 5); // 5GB + conf.setInt(NMConfig.NM_VMEM_GB, 5); // 5GB conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345"); conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346"); conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());