Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 09DFE914C for ; Sat, 4 Feb 2012 03:41:51 +0000 (UTC) Received: (qmail 27336 invoked by uid 500); 4 Feb 2012 03:41:49 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 27245 invoked by uid 500); 4 Feb 2012 03:41:43 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 27236 invoked by uid 99); 4 Feb 2012 03:41:42 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Feb 2012 03:41:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Feb 2012 03:41:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 996A52388ABB; Sat, 4 Feb 2012 03:41:06 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1240450 [5/6] - in /hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop... Date: Sat, 04 Feb 2012 03:40:56 -0000 To: mapreduce-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120204034106.996A52388ABB@eris.apache.org> Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Sat Feb 4 03:40:45 2012 @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; @@ -37,14 +38,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.EventReader; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -61,6 +66,9 @@ import org.apache.hadoop.yarn.service.Se import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.RackResolver; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestJobHistoryParsing { private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); @@ -76,6 +84,17 @@ public class TestJobHistoryParsing { @Test public void testHistoryParsing() throws Exception { + checkHistoryParsing(2, 1, 2); + } + + @Test + public void testHistoryParsingWithParseErrors() throws Exception { + checkHistoryParsing(3, 0, 2); + } + + private void checkHistoryParsing(final int numMaps, final int numReduces, + final int numSuccessfulMaps) + throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); long amStartTimeEst = System.currentTimeMillis(); @@ -83,8 +102,9 @@ public class TestJobHistoryParsing { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); - MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), - true); + MRApp app = + new MRAppWithHistory(numMaps, numReduces, true, + this.getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); @@ -117,8 +137,42 @@ public class TestJobHistoryParsing { } JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); - + final EventReader realReader = new EventReader(in); + EventReader reader = Mockito.mock(EventReader.class); + if (numMaps == numSuccessfulMaps) { + reader = realReader; + } else { + final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! + Mockito.when(reader.getNextEvent()).thenAnswer( + new Answer() { + public HistoryEvent answer(InvocationOnMock invocation) + throws IOException { + HistoryEvent event = realReader.getNextEvent(); + if (event instanceof TaskFinishedEvent) { + numFinishedEvents.incrementAndGet(); + } + + if (numFinishedEvents.get() <= numSuccessfulMaps) { + return event; + } else { + throw new IOException("test"); + } + } + } + ); + } + + JobInfo jobInfo = parser.parse(reader); + + long numFinishedMaps = + computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); + + if (numFinishedMaps != numMaps) { + Exception parseException = parser.getParseException(); + Assert.assertNotNull("Didn't get expected parse exception", + parseException); + } + Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), jobInfo.getUsername()); Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); @@ -126,14 +180,16 @@ public class TestJobHistoryParsing { jobInfo.getJobQueueName()); Assert .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); - Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps()); - Assert.assertEquals("incorrect finishedReduces ", 1, + Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, + numFinishedMaps); + Assert.assertEquals("incorrect finishedReduces ", numReduces, jobInfo.getFinishedReduces()); Assert.assertEquals("incorrect uberized ", job.isUber(), jobInfo.getUberized()); Map allTasks = jobInfo.getAllTasks(); int totalTasks = allTasks.size(); - Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); + Assert.assertEquals("total number of tasks is incorrect ", + (numMaps+numReduces), totalTasks); // Verify aminfo Assert.assertEquals(1, jobInfo.getAMInfos().size()); @@ -172,55 +228,78 @@ public class TestJobHistoryParsing { Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); Assert.assertEquals("Incorrect shuffle port for task attempt", taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort()); - Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); - Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); - - // Verify rack-name - Assert.assertEquals("rack-name is incorrect", taskAttemptInfo - .getRackname(), RACK_NAME); + if (numMaps == numSuccessfulMaps) { + Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); + Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); + + // Verify rack-name + Assert.assertEquals("rack-name is incorrect", taskAttemptInfo + .getRackname(), RACK_NAME); + } } } - String summaryFileName = JobHistoryUtils - .getIntermediateSummaryFileName(jobId); - Path summaryFile = new Path(jobhistoryDir, summaryFileName); - String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); - Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); - Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); - Assert.assertNotNull(jobSummaryString); - - Map jobSummaryElements = new HashMap(); - StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); - while (strToken.hasMoreTokens()) { - String keypair = strToken.nextToken(); - jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); + if (numMaps == numSuccessfulMaps) { - } + String summaryFileName = JobHistoryUtils + .getIntermediateSummaryFileName(jobId); + Path summaryFile = new Path(jobhistoryDir, summaryFileName); + String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); + Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); + Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); + Assert.assertNotNull(jobSummaryString); + + Map jobSummaryElements = new HashMap(); + StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); + while (strToken.hasMoreTokens()) { + String keypair = strToken.nextToken(); + jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); - Assert.assertEquals("JobId does not match", jobId.toString(), - jobSummaryElements.get("jobId")); - Assert.assertTrue("submitTime should not be 0", - Long.parseLong(jobSummaryElements.get("submitTime")) != 0); - Assert.assertTrue("launchTime should not be 0", - Long.parseLong(jobSummaryElements.get("launchTime")) != 0); - Assert.assertTrue("firstMapTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); - Assert - .assertTrue( - "firstReduceTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); - Assert.assertTrue("finishTime should not be 0", - Long.parseLong(jobSummaryElements.get("finishTime")) != 0); - Assert.assertEquals("Mismatch in num map slots", 2, - Integer.parseInt(jobSummaryElements.get("numMaps"))); - Assert.assertEquals("Mismatch in num reduce slots", 1, - Integer.parseInt(jobSummaryElements.get("numReduces"))); - Assert.assertEquals("User does not match", System.getProperty("user.name"), - jobSummaryElements.get("user")); - Assert.assertEquals("Queue does not match", "default", - jobSummaryElements.get("queue")); - Assert.assertEquals("Status does not match", "SUCCEEDED", - jobSummaryElements.get("status")); + } + + Assert.assertEquals("JobId does not match", jobId.toString(), + jobSummaryElements.get("jobId")); + Assert.assertTrue("submitTime should not be 0", + Long.parseLong(jobSummaryElements.get("submitTime")) != 0); + Assert.assertTrue("launchTime should not be 0", + Long.parseLong(jobSummaryElements.get("launchTime")) != 0); + Assert.assertTrue("firstMapTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); + Assert + .assertTrue( + "firstReduceTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); + Assert.assertTrue("finishTime should not be 0", + Long.parseLong(jobSummaryElements.get("finishTime")) != 0); + Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, + Integer.parseInt(jobSummaryElements.get("numMaps"))); + Assert.assertEquals("Mismatch in num reduce slots", numReduces, + Integer.parseInt(jobSummaryElements.get("numReduces"))); + Assert.assertEquals("User does not match", System.getProperty("user.name"), + jobSummaryElements.get("user")); + Assert.assertEquals("Queue does not match", "default", + jobSummaryElements.get("queue")); + Assert.assertEquals("Status does not match", "SUCCEEDED", + jobSummaryElements.get("status")); + } + } + + // Computes finished maps similar to RecoveryService... + private long computeFinishedMaps(JobInfo jobInfo, + int numMaps, int numSuccessfulMaps) { + if (numMaps == numSuccessfulMaps) { + return jobInfo.getFinishedMaps(); + } + + long numFinishedMaps = 0; + Map taskInfos = + jobInfo.getAllTasks(); + for (TaskInfo taskInfo : taskInfos.values()) { + if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { + ++numFinishedMaps; + } + } + return numFinishedMaps; } @Test @@ -264,6 +343,9 @@ public class TestJobHistoryParsing { JobHistoryParser parser = new JobHistoryParser(in); JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + Assert.assertNull("Caught an expected exception " + parseException, + parseException); int noOffailedAttempts = 0; Map allTasks = jobInfo.getAllTasks(); for (Task task : job.getTasks().values()) { Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb 4 03:40:45 2012 @@ -74,7 +74,7 @@ public class TestFileOutputCommitter ext TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID); FileOutputCommitter committer = new FileOutputCommitter(); FileOutputFormat.setWorkOutputPath(job, - committer.getTempTaskOutputPath(tContext)); + committer.getTaskAttemptPath(tContext)); committer.setupJob(jContext); committer.setupTask(tContext); @@ -115,7 +115,7 @@ public class TestFileOutputCommitter ext TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID); FileOutputCommitter committer = new FileOutputCommitter(); FileOutputFormat.setWorkOutputPath(job, committer - .getTempTaskOutputPath(tContext)); + .getTaskAttemptPath(tContext)); // do setup committer.setupJob(jContext); @@ -134,13 +134,13 @@ public class TestFileOutputCommitter ext // do abort committer.abortTask(tContext); File expectedFile = new File(new Path(committer - .getTempTaskOutputPath(tContext), file).toString()); + .getTaskAttemptPath(tContext), file).toString()); assertFalse("task temp dir still exists", expectedFile.exists()); committer.abortJob(jContext, JobStatus.State.FAILED); expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME) .toString()); - assertFalse("job temp dir still exists", expectedFile.exists()); + assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists()); assertEquals("Output directory not empty", 0, new File(outDir.toString()) .listFiles().length); FileUtil.fullyDelete(new File(outDir.toString())); @@ -170,16 +170,15 @@ public class TestFileOutputCommitter ext TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID); FileOutputCommitter committer = new FileOutputCommitter(); FileOutputFormat.setWorkOutputPath(job, committer - .getTempTaskOutputPath(tContext)); + .getTaskAttemptPath(tContext)); // do setup committer.setupJob(jContext); committer.setupTask(tContext); String file = "test.txt"; - String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext); - File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext)); - File taskTmpDir = new File(outDir.toString(), taskBaseDirName); + File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath()); + File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath()); File expectedFile = new File(taskTmpDir, file); // A reporter that does nothing Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java Sat Feb 4 03:40:45 2012 @@ -34,7 +34,7 @@ public class TestTaskCommit extends Hado static class CommitterWithCommitFail extends FileOutputCommitter { public void commitTask(TaskAttemptContext context) throws IOException { - Path taskOutputPath = getTempTaskOutputPath(context); + Path taskOutputPath = getTaskAttemptPath(context); TaskAttemptID attemptId = context.getTaskAttemptID(); JobConf job = context.getJobConf(); if (taskOutputPath != null) { Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Sat Feb 4 03:40:45 2012 @@ -70,6 +70,22 @@ public class TestFileOutputCommitter ext } } + private static void cleanup() throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = outDir.getFileSystem(conf); + fs.delete(outDir, true); + } + + @Override + public void setUp() throws IOException { + cleanup(); + } + + @Override + public void tearDown() throws IOException { + cleanup(); + } + @SuppressWarnings("unchecked") public void testCommitter() throws Exception { Job job = Job.getInstance(); @@ -133,7 +149,7 @@ public class TestFileOutputCommitter ext assertFalse("task temp dir still exists", expectedFile.exists()); committer.abortJob(jContext, JobStatus.State.FAILED); - expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME) + expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME) .toString()); assertFalse("job temp dir still exists", expectedFile.exists()); assertEquals("Output directory not empty", 0, new File(outDir.toString()) @@ -188,9 +204,9 @@ public class TestFileOutputCommitter ext assertNotNull(th); assertTrue(th instanceof IOException); assertTrue(th.getMessage().contains("fake delete failed")); - String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext); - File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext)); - File taskTmpDir = new File(outDir.toString(), taskBaseDirName); + //Path taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext); + File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath()); + File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath()); File expectedFile = new File(taskTmpDir, partFile); assertTrue(expectedFile + " does not exists", expectedFile.exists()); Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-examples/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Feb 4 03:40:45 2012 @@ -1,3 +1,3 @@ -/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-examples:1227776-1239397 -/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163490,1163768,1164255,1164301,1164339,1164771,1166402,1167001,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1182189,1182205,1182214,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204370,1204376,1204388,1205260,1205697,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213630,1213954,1214046,1220510,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227964,1229347,1230398,1231569,1231572,1231627,1231640,1233605,1234555,1235135,1235137,1235956,1236456 +/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-examples:1227776-1240449 +/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163490,1163768,1164255,1164301,1164339,1164771,1166402,1167001,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1182189,1182205,1182214,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204370,1204376,1204388,1205260,1205697,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213630,1213954,1214046,1220510,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227964,1229347,1230398,1231569,1231572,1231627,1231640,1233605,1234555,1235135,1235137,1235956,1236456,1239752 /hadoop/core/branches/branch-0.19/mapred/hadoop-mapreduce-examples:713112 Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh Sat Feb 4 03:40:45 2012 @@ -78,8 +78,11 @@ fi if [ "$YARN_LOG_DIR" = "" ]; then export YARN_LOG_DIR="$YARN_HOME/logs" fi -mkdir -p "$YARN_LOG_DIR" -chown $YARN_IDENT_STRING $YARN_LOG_DIR + +if [ ! -w "$YARN_LOG_DIR" ] ; then + mkdir -p "$YARN_LOG_DIR" + chown $YARN_IDENT_STRING $YARN_LOG_DIR +fi if [ "$YARN_PID_DIR" = "" ]; then YARN_PID_DIR=/tmp @@ -101,7 +104,7 @@ case $startStop in (start) - mkdir -p "$YARN_PID_DIR" + [ -w "$YARN_PID_DIR" ] || mkdir -p "$YARN_PID_DIR" if [ -f $pid ]; then if kill -0 `cat $pid` > /dev/null 2>&1; then Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java Sat Feb 4 03:40:45 2012 @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.logaggreg import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; import org.apache.commons.cli.CommandLine; @@ -103,14 +104,15 @@ public class LogDumper extends Configure if (appOwner == null || appOwner.isEmpty()) { appOwner = UserGroupInformation.getCurrentUser().getShortUserName(); } + int resultCode = 0; if (containerIdStr == null && nodeAddress == null) { - dumpAllContainersLogs(appId, appOwner, out); + resultCode = dumpAllContainersLogs(appId, appOwner, out); } else if ((containerIdStr == null && nodeAddress != null) || (containerIdStr != null && nodeAddress == null)) { System.out.println("ContainerId or NodeAddress cannot be null!"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("general options are: ", opts); - return -1; + resultCode = -1; } else { Path remoteRootLogDir = new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, @@ -122,27 +124,33 @@ public class LogDumper extends Configure appId, appOwner, ConverterUtils.toNodeId(nodeAddress), - getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX))); - return dumpAContainerLogs(containerIdStr, reader, out); + LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()))); + resultCode = dumpAContainerLogs(containerIdStr, reader, out); } - return 0; + return resultCode; } - public void dumpAContainersLogs(String appId, String containerId, + public int dumpAContainersLogs(String appId, String containerId, String nodeId, String jobOwner) throws IOException { Path remoteRootLogDir = new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()); - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - LogAggregationUtils.getRemoteNodeLogFileForApp(remoteRootLogDir, - ConverterUtils.toApplicationId(appId), jobOwner, - ConverterUtils.toNodeId(nodeId), suffix)); + Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp( + remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner, + ConverterUtils.toNodeId(nodeId), suffix); + AggregatedLogFormat.LogReader reader; + try { + reader = new AggregatedLogFormat.LogReader(getConf(), logPath); + } catch (FileNotFoundException fnfe) { + System.out.println("Logs not available at " + logPath.toString()); + System.out.println( + "Log aggregation has not completed or is not enabled."); + return -1; + } DataOutputStream out = new DataOutputStream(System.out); - dumpAContainerLogs(containerId, reader, out); + return dumpAContainerLogs(containerId, reader, out); } private int dumpAContainerLogs(String containerIdStr, @@ -174,21 +182,28 @@ public class LogDumper extends Configure return 0; } - private void dumpAllContainersLogs(ApplicationId appId, String appOwner, + private int dumpAllContainersLogs(ApplicationId appId, String appOwner, DataOutputStream out) throws IOException { Path remoteRootLogDir = new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String user = appOwner; String logDirSuffix = - getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()); //TODO Change this to get a list of files from the LAS. Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user, logDirSuffix); - RemoteIterator nodeFiles = - FileContext.getFileContext().listStatus(remoteAppLogDir); + RemoteIterator nodeFiles; + try { + nodeFiles = FileContext.getFileContext().listStatus(remoteAppLogDir); + } catch (FileNotFoundException fnf) { + System.out.println("Logs not available at " + + remoteAppLogDir.toString()); + System.out.println( + "Log aggregation has not completed or is not enabled."); + return -1; + } while (nodeFiles.hasNext()) { FileStatus thisNodeFile = nodeFiles.next(); AggregatedLogFormat.LogReader reader = @@ -217,12 +232,14 @@ public class LogDumper extends Configure reader.close(); } } + return 0; } public static void main(String[] args) throws Exception { Configuration conf = new YarnConfiguration(); LogDumper logDumper = new LogDumper(); logDumper.setConf(conf); - logDumper.run(args); + int exitCode = logDumper.run(args); + System.exit(exitCode); } } Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java Sat Feb 4 03:40:45 2012 @@ -38,7 +38,7 @@ public class ClusterMetrics { private static AtomicBoolean isInitialized = new AtomicBoolean(false); - @Metric("# of active NMs") MutableGaugeInt numNMs; + @Metric("# of active NMs") MutableGaugeInt numActiveNMs; @Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs; @Metric("# of lost NMs") MutableGaugeInt numLostNMs; @Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs; @@ -74,7 +74,7 @@ public class ClusterMetrics { //Active Nodemanagers public int getNumActiveNMs() { - return numNMs.value(); + return numActiveNMs.value(); } //Decommisioned NMs @@ -128,17 +128,12 @@ public class ClusterMetrics { public void decrNumRebootedNMs() { numRebootedNMs.decr(); } - - public void removeNode(RMNodeEventType nodeEventType) { - numNMs.decr(); - switch(nodeEventType){ - case DECOMMISSION: incrDecommisionedNMs(); break; - case EXPIRE: incrNumLostNMs();break; - case REBOOTING: incrNumRebootedNMs();break; - } + + public void incrNumActiveNodes() { + numActiveNMs.incr(); } - - public void addNode() { - numNMs.incr(); + + public void decrNumActiveNodes() { + numActiveNMs.decr(); } } Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Sat Feb 4 03:40:45 2012 @@ -111,4 +111,12 @@ public class Resources { public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) { return lhs.getMemory() >= rhs.getMemory(); } + + public static Resource min(Resource lhs, Resource rhs) { + return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs; + } + + public static Resource max(Resource lhs, Resource rhs) { + return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs; + } } Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Sat Feb 4 03:40:45 2012 @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.util.Build */ @Private @Unstable +@SuppressWarnings("unchecked") public class RMNodeImpl implements RMNode, EventHandler { private static final Log LOG = LogFactory.getLog(RMNodeImpl.class); @@ -116,11 +117,14 @@ public class RMNodeImpl implements RMNod EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) .addTransition(RMNodeState.RUNNING, RMNodeState.DECOMMISSIONED, - RMNodeEventType.DECOMMISSION, new RemoveNodeTransition()) + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(RMNodeState.DECOMMISSIONED)) .addTransition(RMNodeState.RUNNING, RMNodeState.LOST, - RMNodeEventType.EXPIRE, new RemoveNodeTransition()) + RMNodeEventType.EXPIRE, + new DeactivateNodeTransition(RMNodeState.LOST)) .addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED, - RMNodeEventType.REBOOTING, new RemoveNodeTransition()) + RMNodeEventType.REBOOTING, + new DeactivateNodeTransition(RMNodeState.REBOOTED)) .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING, @@ -304,26 +308,50 @@ public class RMNodeImpl implements RMNod writeLock.unlock(); } } - + + private void updateMetricsForRejoinedNode(RMNodeState previousNodeState) { + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + metrics.incrNumActiveNodes(); + + switch (previousNodeState) { + case LOST: + metrics.decrNumLostNMs(); + break; + case REBOOTED: + metrics.decrNumRebootedNMs(); + break; + case DECOMMISSIONED: + metrics.decrDecommisionedNMs(); + break; + case UNHEALTHY: + metrics.decrNumUnhealthyNMs(); + break; + } + } + + private void updateMetricsForDeactivatedNode(RMNodeState finalState) { + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + metrics.decrNumActiveNodes(); + + switch (finalState) { + case DECOMMISSIONED: + metrics.incrDecommisionedNMs(); + break; + case LOST: + metrics.incrNumLostNMs(); + break; + case REBOOTED: + metrics.incrNumRebootedNMs(); + break; + case UNHEALTHY: + metrics.incrNumUnhealthyNMs(); + break; + } + } + public static class AddNodeTransition implements SingleArcTransition { - - private void updateMetrics(RMNodeState nodeState) { - ClusterMetrics metrics = ClusterMetrics.getMetrics(); - switch (nodeState) { - case LOST: - metrics.decrNumLostNMs(); - break; - case REBOOTED: - metrics.decrNumRebootedNMs(); - break; - case DECOMMISSIONED: - metrics.decrDecommisionedNMs(); - break; - } - } - @SuppressWarnings("unchecked") @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler @@ -333,12 +361,14 @@ public class RMNodeImpl implements RMNod String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { - RMNode node = rmNode.context.getInactiveRMNodes().get(host); + // Old node rejoining + RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host); rmNode.context.getInactiveRMNodes().remove(host); - updateMetrics(node.getState()); + rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); + } else { + // Increment activeNodes explicitly because this is a new node. + ClusterMetrics.getMetrics().incrNumActiveNodes(); } - - ClusterMetrics.getMetrics().addNode(); } } @@ -362,28 +392,33 @@ public class RMNodeImpl implements RMNod } } - public static class RemoveNodeTransition + public static class DeactivateNodeTransition implements SingleArcTransition { - @SuppressWarnings("unchecked") + private final RMNodeState finalState; + public DeactivateNodeTransition(RMNodeState finalState) { + this.finalState = finalState; + } + @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); - // Remove the node from the system. + // Deactivate the node rmNode.context.getRMNodes().remove(rmNode.nodeId); - LOG.info("Removed Node " + rmNode.nodeId); + LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " + + finalState); rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode); - //Update the metrics - ClusterMetrics.getMetrics().removeNode(event.getType()); + + //Update the metrics + rmNode.updateMetricsForDeactivatedNode(finalState); } } public static class StatusUpdateWhenHealthyTransition implements MultipleArcTransition { - @SuppressWarnings("unchecked") @Override public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { @@ -399,7 +434,8 @@ public class RMNodeImpl implements RMNod // Inform the scheduler rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); - ClusterMetrics.getMetrics().incrNumUnhealthyNMs(); + // Update metrics + rmNode.updateMetricsForDeactivatedNode(RMNodeState.UNHEALTHY); return RMNodeState.UNHEALTHY; } @@ -458,11 +494,9 @@ public class RMNodeImpl implements RMNod } } - public static class StatusUpdateWhenUnHealthyTransition - implements + public static class StatusUpdateWhenUnHealthyTransition implements MultipleArcTransition { - @SuppressWarnings("unchecked") @Override public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; @@ -474,7 +508,8 @@ public class RMNodeImpl implements RMNod if (remoteNodeHealthStatus.getIsNodeHealthy()) { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); + // Update metrics + rmNode.updateMetricsForRejoinedNode(RMNodeState.UNHEALTHY); return RMNodeState.RUNNING; } Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Sat Feb 4 03:40:45 2012 @@ -162,6 +162,13 @@ public class AppSchedulingInfo { asks.put(hostName, request); if (updatePendingResources) { + + // Similarly, deactivate application? + if (request.getNumContainers() <= 0) { + LOG.info("checking for deactivate... "); + checkForDeactivation(); + } + int lastRequestContainers = lastRequest != null ? lastRequest .getNumContainers() : 0; Resource lastRequestCapability = lastRequest != null ? lastRequest @@ -308,19 +315,24 @@ public class AppSchedulingInfo { // Do we have any outstanding requests? // If there is nothing, we need to deactivate this application if (numOffSwitchContainers == 0) { - boolean deactivate = true; - for (Priority priority : getPriorities()) { - ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY); - if (request.getNumContainers() > 0) { - deactivate = false; - break; - } - } - if (deactivate) { - activeUsersManager.deactivateApplication(user, applicationId); + checkForDeactivation(); + } + } + + synchronized private void checkForDeactivation() { + boolean deactivate = true; + for (Priority priority : getPriorities()) { + ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY); + if (request.getNumContainers() > 0) { + deactivate = false; + break; } } + if (deactivate) { + activeUsersManager.deactivateApplication(user, applicationId); + } } + synchronized private void allocate(Container container) { // Update consumption and track allocations //TODO: fixme sharad Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Sat Feb 4 03:40:45 2012 @@ -46,17 +46,23 @@ class CSQueueUtils { } public static int computeMaxActiveApplications(Resource clusterResource, - float maxAMResourcePercent, float absoluteCapacity) { + Resource minimumAllocation, float maxAMResourcePercent, + float absoluteMaxCapacity) { return Math.max( - (int)((clusterResource.getMemory() / (float)LeafQueue.DEFAULT_AM_RESOURCE) * - maxAMResourcePercent * absoluteCapacity), + (int)Math.ceil( + ((float)clusterResource.getMemory() / + minimumAllocation.getMemory()) * + maxAMResourcePercent * absoluteMaxCapacity), 1); } public static int computeMaxActiveApplicationsPerUser( int maxActiveApplications, int userLimit, float userLimitFactor) { - return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor); + return Math.max( + (int)Math.ceil( + maxActiveApplications * (userLimit / 100.0f) * userLimitFactor), + 1); } } Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Sat Feb 4 03:40:45 2012 @@ -125,8 +125,6 @@ public class LeafQueue implements CSQueu private final ActiveUsersManager activeUsersManager; - final static int DEFAULT_AM_RESOURCE = 2 * 1024; - public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, Comparator applicationComparator, CSQueue old) { @@ -166,8 +164,9 @@ public class LeafQueue implements CSQueu this.maxAMResourcePercent = cs.getConfiguration().getMaximumApplicationMasterResourcePercent(); int maxActiveApplications = - CSQueueUtils.computeMaxActiveApplications(cs.getClusterResources(), - maxAMResourcePercent, absoluteCapacity); + CSQueueUtils.computeMaxActiveApplications( + cs.getClusterResources(), this.minimumAllocation, + maxAMResourcePercent, absoluteMaxCapacity); int maxActiveApplicationsPerUser = CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, userLimitFactor); @@ -246,30 +245,39 @@ public class LeafQueue implements CSQueu " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = " + absoluteMaxCapacity + " [= 1.0 maximumCapacity undefined, " + - "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + "\n" + + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = " + userLimitFactor + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = " + maxApplications + - " [= (int)(configuredMaximumSystemApplications * absoluteCapacity) ]" + "\n" + + " [= (int)(configuredMaximumSystemApplications * absoluteCapacity) ]" + + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser + - " [= (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor) ]" + "\n" + + " [= (int)(maxApplications * (userLimit / 100.0f) * " + + "userLimitFactor) ]" + "\n" + "maxActiveApplications = " + maxActiveApplications + " [= max(" + - "(int)((clusterResourceMemory / (float)DEFAULT_AM_RESOURCE) *" + - "maxAMResourcePercent * absoluteCapacity)," + + "(int)ceil((clusterResourceMemory / minimumAllocation) *" + + "maxAMResourcePercent * absoluteMaxCapacity)," + "1) ]" + "\n" + "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser + - " [= (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor) ]" + "\n" + + " [= max(" + + "(int)(maxActiveApplications * (userLimit / 100.0f) * " + + "userLimitFactor)," + + "1) ]" + "\n" + "utilization = " + utilization + - " [= usedResourcesMemory / (clusterResourceMemory * absoluteCapacity)]" + "\n" + + " [= usedResourcesMemory / " + + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + "usedCapacity = " + usedCapacity + - " [= usedResourcesMemory / (clusterResourceMemory * parent.absoluteCapacity)]" + "\n" + + " [= usedResourcesMemory / " + + "(clusterResourceMemory * parent.absoluteCapacity)]" + "\n" + "maxAMResourcePercent = " + maxAMResourcePercent + " [= configuredMaximumAMResourcePercent ]" + "\n" + "minimumAllocationFactor = " + minimumAllocationFactor + - " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / maximumAllocationMemory ]" + "\n" + + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " + + "maximumAllocationMemory ]" + "\n" + "numContainers = " + numContainers + " [= currentNumContainers ]" + "\n" + "state = " + state + @@ -606,7 +614,10 @@ public class LeafQueue implements CSQueu addApplication(application, user); } - metrics.submitApp(userName); + int attemptId = application.getApplicationAttemptId().getAttemptId(); + if (attemptId == 1) { + metrics.submitApp(userName); + } // Inform the parent queue try { @@ -635,7 +646,7 @@ public class LeafQueue implements CSQueu user.activateApplication(); activeApplications.add(application); i.remove(); - LOG.info("Application " + application.getApplicationId().getId() + + LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + " activated in queue: " + getQueueName()); } @@ -673,10 +684,13 @@ public class LeafQueue implements CSQueu } public synchronized void removeApplication(SchedulerApp application, User user) { - activeApplications.remove(application); + boolean wasActive = activeApplications.remove(application); + if (!wasActive) { + pendingApplications.remove(application); + } applicationsMap.remove(application.getApplicationAttemptId()); - user.finishApplication(); + user.finishApplication(wasActive); if (user.getTotalApplications() == 0) { users.remove(application.getUser()); } @@ -751,15 +765,15 @@ public class LeafQueue implements CSQueu continue; } - // Compute & set headroom - // Note: We set the headroom with the highest priority request - // as the target. + // Compute user-limit & set headroom + // Note: We compute both user-limit & headroom with the highest + // priority request as the target. // This works since we never assign lower priority requests // before all higher priority ones are serviced. Resource userLimit = - computeAndSetUserResourceLimit(application, clusterResource, - required); - + computeUserLimitAndSetHeadroom(application, clusterResource, + required); + // Check queue max-capacity limit if (!assignToQueue(clusterResource, required)) { return NULL_ASSIGNMENT; @@ -777,13 +791,13 @@ public class LeafQueue implements CSQueu CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, null); - - Resource assigned = assignment.getResource(); - + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); if (Resources.greaterThan(assigned, Resources.none())) { - // Book-keeping + // Book-keeping + // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned); // Reset scheduling opportunities @@ -854,20 +868,50 @@ public class LeafQueue implements CSQueu } @Lock({LeafQueue.class, SchedulerApp.class}) - private Resource computeAndSetUserResourceLimit(SchedulerApp application, - Resource clusterResource, Resource required) { + private Resource computeUserLimitAndSetHeadroom( + SchedulerApp application, Resource clusterResource, Resource required) { + String user = application.getUser(); - Resource limit = computeUserLimit(application, clusterResource, required); + + /** + * Headroom is min((userLimit, queue-max-cap) - consumed) + */ + + Resource userLimit = // User limit + computeUserLimit(application, clusterResource, required); + + + Resource queueMaxCap = // Queue Max-Capacity + Resources.createResource( + roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory())) + ); + + Resource userConsumed = getUser(user).getConsumedResources(); Resource headroom = - Resources.subtract(limit, getUser(user).getConsumedResources()); + Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed); + + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for user " + user + ": " + + " userLimit=" + userLimit + + " queueMaxCap=" + queueMaxCap + + " consumed=" + userConsumed + + " headroom=" + headroom); + } + application.setHeadroom(headroom); metrics.setAvailableResourcesToUser(user, headroom); - return limit; + + return userLimit; } private int roundUp(int memory) { - return divideAndCeil(memory, minimumAllocation.getMemory()) * - minimumAllocation.getMemory(); + int minMemory = minimumAllocation.getMemory(); + return divideAndCeil(memory, minMemory) * minMemory; + } + + private int roundDown(int memory) { + int minMemory = minimumAllocation.getMemory(); + return (memory / minMemory) * minMemory; } @Lock(NoLock.class) @@ -1288,10 +1332,17 @@ public class LeafQueue implements CSQueu String userName = application.getUser(); User user = getUser(userName); user.assignContainer(resource); + Resources.subtractFrom(application.getHeadroom(), resource); // headroom metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - LOG.info(getQueueName() + - " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " user-resources=" + user.getConsumedResources()); + + if (LOG.isDebugEnabled()) { + LOG.info(getQueueName() + + " user=" + userName + + " used=" + usedResources + " numContainers=" + numContainers + + " headroom = " + application.getHeadroom() + + " user-resources=" + user.getConsumedResources() + ); + } } synchronized void releaseResource(Resource clusterResource, @@ -1316,17 +1367,18 @@ public class LeafQueue implements CSQueu public synchronized void updateClusterResource(Resource clusterResource) { // Update queue properties maxActiveApplications = - CSQueueUtils.computeMaxActiveApplications(clusterResource, maxAMResourcePercent, - absoluteCapacity); + CSQueueUtils.computeMaxActiveApplications( + clusterResource, minimumAllocation, + maxAMResourcePercent, absoluteMaxCapacity); maxActiveApplicationsPerUser = - CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, - userLimitFactor); + CSQueueUtils.computeMaxActiveApplicationsPerUser( + maxActiveApplications, userLimit, userLimitFactor); // Update application properties for (SchedulerApp application : activeApplications) { synchronized (application) { - computeAndSetUserResourceLimit( - application, clusterResource, Resources.none()); + computeUserLimitAndSetHeadroom(application, clusterResource, + Resources.none()); } } } @@ -1378,8 +1430,13 @@ public class LeafQueue implements CSQueu ++activeApplications; } - public synchronized void finishApplication() { - --activeApplications; + public synchronized void finishApplication(boolean wasActive) { + if (wasActive) { + --activeApplications; + } + else { + --pendingApplications; + } } public synchronized void assignContainer(Resource resource) { Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sat Feb 4 03:40:45 2012 @@ -298,7 +298,9 @@ public class FifoScheduler implements Re new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext, null); applications.put(appAttemptId, schedulerApp); - metrics.submitApp(user); + if (appAttemptId.getAttemptId() == 1) { + metrics.submitApp(user); + } LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user + ", currently active: " + applications.size()); rmContext.getDispatcher().getEventHandler().handle( Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Sat Feb 4 03:40:45 2012 @@ -100,6 +100,12 @@ class NodesPage extends RmView { if(!stateFilter.equals(state)) { continue; } + } else { + // No filter. User is asking for all nodes. Make sure you skip the + // unhealthy nodes. + if (ni.getState() == RMNodeState.UNHEALTHY) { + continue; + } } NodeInfo info = new NodeInfo(ni, sched); int usedMemory = (int)info.getUsedMemory(); Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Sat Feb 4 03:40:45 2012 @@ -166,6 +166,12 @@ public class RMWebServices { if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) { continue; } + } else { + // No filter. User is asking for all nodes. Make sure you skip the + // unhealthy nodes. + if (ni.getState() == RMNodeState.UNHEALTHY) { + continue; + } } if ((healthState != null) && (!healthState.isEmpty())) { LOG.info("heatlh state is : " + healthState); Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Sat Feb 4 03:40:45 2012 @@ -51,18 +51,23 @@ public class MockNodes { List list = Lists.newArrayList(); for (int i = 0; i < racks; ++i) { for (int j = 0; j < nodesPerRack; ++j) { + if (j == (nodesPerRack - 1)) { + // One unhealthy node per rack. + list.add(nodeInfo(i, perNode, RMNodeState.UNHEALTHY)); + } list.add(newNodeInfo(i, perNode)); } } return list; } - public static List lostNodes(int racks, int nodesPerRack, + public static List deactivatedNodes(int racks, int nodesPerRack, Resource perNode) { List list = Lists.newArrayList(); for (int i = 0; i < racks; ++i) { for (int j = 0; j < nodesPerRack; ++j) { - list.add(lostNodeInfo(i, perNode, RMNodeState.LOST)); + RMNodeState[] allStates = RMNodeState.values(); + list.add(nodeInfo(i, perNode, allStates[j % allStates.length])); } } return list; @@ -198,15 +203,20 @@ public class MockNodes { final String httpAddress = httpAddr; final NodeHealthStatus nodeHealthStatus = recordFactory.newRecordInstance(NodeHealthStatus.class); + if (state != RMNodeState.UNHEALTHY) { + nodeHealthStatus.setIsNodeHealthy(true); + nodeHealthStatus.setHealthReport("HealthyMe"); + } return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName, nodeHealthStatus, nid, hostName, state); } - public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) { + public static RMNode nodeInfo(int rack, final Resource perNode, + RMNodeState state) { return buildRMNode(rack, perNode, state, "N/A"); } public static RMNode newNodeInfo(int rack, final Resource perNode) { - return buildRMNode(rack, perNode, null, "localhost:0"); + return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0"); } } Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Sat Feb 4 03:40:45 2012 @@ -153,29 +153,31 @@ public class TestApplicationLimits { queue.getMaximumActiveApplicationsPerUser()); int expectedMaxActiveApps = Math.max(1, - (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * + (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * csConf.getMaximumApplicationMasterResourcePercent() * - queue.getAbsoluteCapacity())); + queue.getAbsoluteMaximumCapacity())); assertEquals(expectedMaxActiveApps, queue.getMaximumActiveApplications()); - assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * - queue.getUserLimitFactor()), - queue.getMaximumActiveApplicationsPerUser()); + assertEquals( + (int)Math.ceil( + expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * + queue.getUserLimitFactor()), + queue.getMaximumActiveApplicationsPerUser()); // Add some nodes to the cluster & test new limits clusterResource = Resources.createResource(120 * 16 * GB); root.updateClusterResource(clusterResource); expectedMaxActiveApps = Math.max(1, - (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * + (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * csConf.getMaximumApplicationMasterResourcePercent() * - queue.getAbsoluteCapacity())); + queue.getAbsoluteMaximumCapacity())); assertEquals(expectedMaxActiveApps, queue.getMaximumActiveApplications()); - assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * - queue.getUserLimitFactor()), - queue.getMaximumActiveApplicationsPerUser()); - + assertEquals( + (int)Math.ceil(expectedMaxActiveApps * + (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()), + queue.getMaximumActiveApplicationsPerUser()); } @Test @@ -257,6 +259,87 @@ public class TestApplicationLimits { } @Test + public void testActiveLimitsWithKilledApps() throws Exception { + final String user_0 = "user_0"; + + int APPLICATION_ID = 0; + + // set max active to 2 + doReturn(2).when(queue).getMaximumActiveApplications(); + + // Submit first application + SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_0, user_0, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertTrue(queue.activeApplications.contains(app_0)); + + // Submit second application + SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_1, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertTrue(queue.activeApplications.contains(app_1)); + + // Submit third application, should remain pending + SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_2, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertTrue(queue.pendingApplications.contains(app_2)); + + // Submit fourth application, should remain pending + SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_3, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(2, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(2, queue.getNumPendingApplications(user_0)); + assertTrue(queue.pendingApplications.contains(app_3)); + + // Kill 3rd pending application + queue.finishApplication(app_2, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertFalse(queue.pendingApplications.contains(app_2)); + assertFalse(queue.activeApplications.contains(app_2)); + + // Finish 1st application, app_3 should become active + queue.finishApplication(app_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertTrue(queue.activeApplications.contains(app_3)); + assertFalse(queue.pendingApplications.contains(app_3)); + assertFalse(queue.activeApplications.contains(app_0)); + + // Finish 2nd application + queue.finishApplication(app_1, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertFalse(queue.activeApplications.contains(app_1)); + + // Finish 4th application + queue.finishApplication(app_3, A); + assertEquals(0, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(0, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + assertFalse(queue.activeApplications.contains(app_3)); + } + + @Test public void testHeadroom() throws Exception { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Sat Feb 4 03:40:45 2012 @@ -38,6 +38,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -57,6 +59,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.junit.After; @@ -67,6 +70,8 @@ import org.mockito.stubbing.Answer; public class TestLeafQueue { + private static final Log LOG = LogFactory.getLog(TestLeafQueue.class); + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -88,6 +93,7 @@ public class TestLeafQueue { csConf = new CapacitySchedulerConfiguration(); + csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); setupQueueConfiguration(csConf); @@ -254,6 +260,35 @@ public class TestLeafQueue { assertEquals(7*GB, a.getMetrics().getAvailableMB()); } + @Test + public void testAppAttemptMetrics() throws Exception { + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 1); + SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null, + rmContext, null); + a.submitApplication(app_0, user_0, B); + + // Attempt the same application again + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(0, 2); + SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null, + rmContext, null); + a.submitApplication(app_1, user_0, B); // same user + + assertEquals(1, a.getMetrics().getAppsSubmitted()); + assertEquals(1, a.getMetrics().getAppsPending()); + + QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0); + assertEquals(1, userMetrics.getAppsSubmitted()); + } @Test public void testSingleQueueWithOneUser() throws Exception { @@ -473,6 +508,115 @@ public class TestLeafQueue { } @Test + public void testHeadroomWithMaxCap() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + SchedulerApp app_0 = + new SchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_0, user_0, A); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + SchedulerApp app_1 = + new SchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_1, user_0, A); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + SchedulerApp app_2 = + new SchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_2, user_1, A); + + // Setup some nodes + String host_0 = "host_0"; + SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "host_1"; + SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority, + recordFactory))); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority, + recordFactory))); + + /** + * Start testing... + */ + + // Set user-limit + a.setUserLimit(50); + a.setUserLimitFactor(2); + + // Now, only user_0 should be active since he is the only one with + // outstanding requests + assertEquals("There should only be 1 active user!", + 1, a.getActiveUsersManager().getNumActiveUsers()); + + // 1 container to user_0 + a.assignContainers(clusterResource, node_0); + assertEquals(2*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G + + // Again one to user_0 since he hasn't exceeded user limit yet + a.assignContainers(clusterResource, node_0); + assertEquals(3*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G + + // Submit requests for app_1 and set max-cap + a.setMaxCapacity(.1f); + app_2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority, + recordFactory))); + assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); + + // No more to user_0 since he is already over user-limit + // and no more containers to queue since it's already at max-cap + a.assignContainers(clusterResource, node_1); + assertEquals(3*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_0.getHeadroom().getMemory()); + assertEquals(0*GB, app_1.getHeadroom().getMemory()); + + // Check headroom for app_2 + LOG.info("here"); + app_1.updateResourceRequests(Collections.singletonList( // unset + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority, + recordFactory))); + assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); + a.assignContainers(clusterResource, node_1); + assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + } + + @Test public void testSingleQueueWithMultipleUsers() throws Exception { // Mock the queue Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Sat Feb 4 03:40:45 2012 @@ -86,7 +86,7 @@ public class TestParentQueue { private SchedulerApp getMockApplication(int appId, String user) { SchedulerApp application = mock(SchedulerApp.class); doReturn(user).when(application).getUser(); - doReturn(null).when(application).getHeadroom(); + doReturn(Resources.createResource(0)).when(application).getHeadroom(); return application; } Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1240450&r1=1240449&r2=1240450&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original) +++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Sat Feb 4 03:40:45 2012 @@ -26,17 +26,28 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Before; +import org.junit.Test; public class TestFifoScheduler { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -63,7 +74,30 @@ public class TestFifoScheduler { .getRMContext()); } + @Test + public void testAppAttemptMetrics() throws Exception { + AsyncDispatcher dispatcher = new InlineDispatcher(); + RMContext rmContext = new RMContextImpl(null, dispatcher, null, null, null); + FifoScheduler schedular = new FifoScheduler(); + schedular.reinitialize(new Configuration(), null, rmContext); + + ApplicationId appId = BuilderUtils.newApplicationId(200, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + + SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "queue", + "user"); + schedular.handle(event); + + appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); + + event = new AppAddedSchedulerEvent(appAttemptId, "queue", "user"); + schedular.handle(event); + + QueueMetrics metrics = schedular.getRootQueueMetrics(); + Assert.assertEquals(1, metrics.getAppsSubmitted()); + } // @Test public void testFifoScheduler() throws Exception {