hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1240450 [5/6] - in /hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop...
Date Sat, 04 Feb 2012 03:40:56 GMT
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Sat Feb  4 03:40:45 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -37,14 +38,18 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -61,6 +66,9 @@ import org.apache.hadoop.yarn.service.Se
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestJobHistoryParsing {
   private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
@@ -76,6 +84,17 @@ public class TestJobHistoryParsing {
 
   @Test
   public void testHistoryParsing() throws Exception {
+    checkHistoryParsing(2, 1, 2);
+  }
+  
+  @Test
+  public void testHistoryParsingWithParseErrors() throws Exception {
+    checkHistoryParsing(3, 0, 2);
+  }
+  
+  private void checkHistoryParsing(final int numMaps, final int numReduces,
+      final int numSuccessfulMaps) 
+  throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
     long amStartTimeEst = System.currentTimeMillis();
@@ -83,8 +102,9 @@ public class TestJobHistoryParsing {
         CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
         MyResolver.class, DNSToSwitchMapping.class);
     RackResolver.init(conf);
-    MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
-        true);
+    MRApp app = 
+        new MRAppWithHistory(numMaps, numReduces, true, 
+            this.getClass().getName(), true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
     JobId jobId = job.getID();
@@ -117,8 +137,42 @@ public class TestJobHistoryParsing {
     }
 
     JobHistoryParser parser = new JobHistoryParser(in);
-    JobInfo jobInfo = parser.parse();
-
+    final EventReader realReader = new EventReader(in);
+    EventReader reader = Mockito.mock(EventReader.class);
+    if (numMaps == numSuccessfulMaps) {
+      reader = realReader;
+    } else {
+      final AtomicInteger numFinishedEvents = new AtomicInteger(0);  // Hack!
+      Mockito.when(reader.getNextEvent()).thenAnswer(
+          new Answer<HistoryEvent>() {
+            public HistoryEvent answer(InvocationOnMock invocation) 
+                throws IOException {
+              HistoryEvent event = realReader.getNextEvent();
+              if (event instanceof TaskFinishedEvent) {
+                numFinishedEvents.incrementAndGet();
+              }
+              
+              if (numFinishedEvents.get() <= numSuccessfulMaps) {
+                return event;
+              } else {
+                throw new IOException("test");
+              }
+            }
+          }
+        );
+    }
+    
+    JobInfo jobInfo = parser.parse(reader);
+    
+    long numFinishedMaps = 
+        computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
+    
+    if (numFinishedMaps != numMaps) {
+      Exception parseException = parser.getParseException();
+      Assert.assertNotNull("Didn't get expected parse exception", 
+          parseException);
+    }
+    
     Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
         jobInfo.getUsername());
     Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
@@ -126,14 +180,16 @@ public class TestJobHistoryParsing {
         jobInfo.getJobQueueName());
     Assert
         .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
-    Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps());
-    Assert.assertEquals("incorrect finishedReduces ", 1,
+    Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, 
+        numFinishedMaps);
+    Assert.assertEquals("incorrect finishedReduces ", numReduces,
         jobInfo.getFinishedReduces());
     Assert.assertEquals("incorrect uberized ", job.isUber(),
         jobInfo.getUberized());
     Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
     int totalTasks = allTasks.size();
-    Assert.assertEquals("total number of tasks is incorrect  ", 3, totalTasks);
+    Assert.assertEquals("total number of tasks is incorrect  ", 
+        (numMaps+numReduces), totalTasks);
 
     // Verify aminfo
     Assert.assertEquals(1, jobInfo.getAMInfos().size());
@@ -172,55 +228,78 @@ public class TestJobHistoryParsing {
         Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
         Assert.assertEquals("Incorrect shuffle port for task attempt",
             taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
-        Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
-        Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
-
-        // Verify rack-name
-        Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
-            .getRackname(), RACK_NAME);
+        if (numMaps == numSuccessfulMaps) {
+          Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
+          Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
+          
+          // Verify rack-name
+          Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
+              .getRackname(), RACK_NAME);
+        }
       }
     }
 
-    String summaryFileName = JobHistoryUtils
-        .getIntermediateSummaryFileName(jobId);
-    Path summaryFile = new Path(jobhistoryDir, summaryFileName);
-    String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
-    Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
-    Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
-    Assert.assertNotNull(jobSummaryString);
-
-    Map<String, String> jobSummaryElements = new HashMap<String, String>();
-    StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
-    while (strToken.hasMoreTokens()) {
-      String keypair = strToken.nextToken();
-      jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
+    if (numMaps == numSuccessfulMaps) {
 
-    }
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobId);
+      Path summaryFile = new Path(jobhistoryDir, summaryFileName);
+      String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
+      Assert.assertNotNull(jobSummaryString);
+
+      Map<String, String> jobSummaryElements = new HashMap<String, String>();
+      StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
+      while (strToken.hasMoreTokens()) {
+        String keypair = strToken.nextToken();
+        jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
 
-    Assert.assertEquals("JobId does not match", jobId.toString(),
-        jobSummaryElements.get("jobId"));
-    Assert.assertTrue("submitTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
-    Assert.assertTrue("launchTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
-    Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
-    Assert
-        .assertTrue(
-            "firstReduceTaskLaunchTime should not be 0",
-            Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
-    Assert.assertTrue("finishTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
-    Assert.assertEquals("Mismatch in num map slots", 2,
-        Integer.parseInt(jobSummaryElements.get("numMaps")));
-    Assert.assertEquals("Mismatch in num reduce slots", 1,
-        Integer.parseInt(jobSummaryElements.get("numReduces")));
-    Assert.assertEquals("User does not match", System.getProperty("user.name"),
-        jobSummaryElements.get("user"));
-    Assert.assertEquals("Queue does not match", "default",
-        jobSummaryElements.get("queue"));
-    Assert.assertEquals("Status does not match", "SUCCEEDED",
-        jobSummaryElements.get("status"));
+      }
+
+      Assert.assertEquals("JobId does not match", jobId.toString(),
+          jobSummaryElements.get("jobId"));
+      Assert.assertTrue("submitTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
+      Assert.assertTrue("launchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
+      Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+      Assert
+      .assertTrue(
+          "firstReduceTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+      Assert.assertTrue("finishTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
+      Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
+          Integer.parseInt(jobSummaryElements.get("numMaps")));
+      Assert.assertEquals("Mismatch in num reduce slots", numReduces,
+          Integer.parseInt(jobSummaryElements.get("numReduces")));
+      Assert.assertEquals("User does not match", System.getProperty("user.name"),
+          jobSummaryElements.get("user"));
+      Assert.assertEquals("Queue does not match", "default",
+          jobSummaryElements.get("queue"));
+      Assert.assertEquals("Status does not match", "SUCCEEDED",
+          jobSummaryElements.get("status"));
+    }
+  }
+  
+  // Computes finished maps similar to RecoveryService...
+  private long computeFinishedMaps(JobInfo jobInfo, 
+      int numMaps, int numSuccessfulMaps) {
+    if (numMaps == numSuccessfulMaps) {
+      return jobInfo.getFinishedMaps();
+    }
+    
+    long numFinishedMaps = 0;
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = 
+        jobInfo.getAllTasks();
+    for (TaskInfo taskInfo : taskInfos.values()) {
+      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        ++numFinishedMaps;
+      }
+    }
+    return numFinishedMaps;
   }
   
   @Test
@@ -264,6 +343,9 @@ public class TestJobHistoryParsing {
 
     JobHistoryParser parser = new JobHistoryParser(in);
     JobInfo jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    Assert.assertNull("Caught an expected exception " + parseException, 
+        parseException);
     int noOffailedAttempts = 0;
     Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
     for (Task task : job.getTasks().values()) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb  4 03:40:45 2012
@@ -74,7 +74,7 @@ public class TestFileOutputCommitter ext
     TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, 
-      committer.getTempTaskOutputPath(tContext));
+      committer.getTaskAttemptPath(tContext));
 
     committer.setupJob(jContext);
     committer.setupTask(tContext);
@@ -115,7 +115,7 @@ public class TestFileOutputCommitter ext
     TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, committer
-        .getTempTaskOutputPath(tContext));
+        .getTaskAttemptPath(tContext));
 
     // do setup
     committer.setupJob(jContext);
@@ -134,13 +134,13 @@ public class TestFileOutputCommitter ext
     // do abort
     committer.abortTask(tContext);
     File expectedFile = new File(new Path(committer
-        .getTempTaskOutputPath(tContext), file).toString());
+        .getTaskAttemptPath(tContext), file).toString());
     assertFalse("task temp dir still exists", expectedFile.exists());
 
     committer.abortJob(jContext, JobStatus.State.FAILED);
     expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
         .toString());
-    assertFalse("job temp dir still exists", expectedFile.exists());
+    assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
     assertEquals("Output directory not empty", 0, new File(outDir.toString())
         .listFiles().length);
     FileUtil.fullyDelete(new File(outDir.toString()));
@@ -170,16 +170,15 @@ public class TestFileOutputCommitter ext
     TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, committer
-        .getTempTaskOutputPath(tContext));
+        .getTaskAttemptPath(tContext));
 
     // do setup
     committer.setupJob(jContext);
     committer.setupTask(tContext);
     
     String file = "test.txt";
-    String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
-    File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
-    File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
+    File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath());
+    File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath());
     File expectedFile = new File(taskTmpDir, file);
 
     // A reporter that does nothing

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java Sat Feb  4 03:40:45 2012
@@ -34,7 +34,7 @@ public class TestTaskCommit extends Hado
 
   static class CommitterWithCommitFail extends FileOutputCommitter {
     public void commitTask(TaskAttemptContext context) throws IOException {
-      Path taskOutputPath = getTempTaskOutputPath(context);
+      Path taskOutputPath = getTaskAttemptPath(context);
       TaskAttemptID attemptId = context.getTaskAttemptID();
       JobConf job = context.getJobConf();
       if (taskOutputPath != null) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Sat Feb  4 03:40:45 2012
@@ -70,6 +70,22 @@ public class TestFileOutputCommitter ext
     }
   }
 
+  private static void cleanup() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = outDir.getFileSystem(conf);
+    fs.delete(outDir, true);
+  }
+  
+  @Override
+  public void setUp() throws IOException {
+    cleanup();
+  }
+  
+  @Override
+  public void tearDown() throws IOException {
+    cleanup();
+  }
+  
   @SuppressWarnings("unchecked")
   public void testCommitter() throws Exception {
     Job job = Job.getInstance();
@@ -133,7 +149,7 @@ public class TestFileOutputCommitter ext
     assertFalse("task temp dir still exists", expectedFile.exists());
 
     committer.abortJob(jContext, JobStatus.State.FAILED);
-    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+    expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME)
         .toString());
     assertFalse("job temp dir still exists", expectedFile.exists());
     assertEquals("Output directory not empty", 0, new File(outDir.toString())
@@ -188,9 +204,9 @@ public class TestFileOutputCommitter ext
     assertNotNull(th);
     assertTrue(th instanceof IOException);
     assertTrue(th.getMessage().contains("fake delete failed"));
-    String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
-    File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
-    File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
+    //Path taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
+    File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath());
+    File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath());
     File expectedFile = new File(taskTmpDir, partFile);
     assertTrue(expectedFile + " does not exists", expectedFile.exists());
 

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-examples/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb  4 03:40:45 2012
@@ -1,3 +1,3 @@
-/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-examples:1227776-1239397
-/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163490,1163768,1164255,1164301,1164339,1164771,1166402,1167001,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1182189,1182205,1182214,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204370,1204376,1204388,1205260,1205697,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213630,1213954,1214046,1220510,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227964,1229347,1230398,1231569,1231572,1231627,1231640,1233605,1234555,1235135,1235137,1235956,1236456
+/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-examples:1227776-1240449
+/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163490,1163768,1164255,1164301,1164339,1164771,1166402,1167001,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1182189,1182205,1182214,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204370,1204376,1204388,1205260,1205697,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213630,1213954,1214046,1220510,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227964,1229347,1230398,1231569,1231572,1231627,1231640,1233605,1234555,1235135,1235137,1235956,1236456,1239752
 /hadoop/core/branches/branch-0.19/mapred/hadoop-mapreduce-examples:713112

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh Sat Feb  4 03:40:45 2012
@@ -78,8 +78,11 @@ fi
 if [ "$YARN_LOG_DIR" = "" ]; then
   export YARN_LOG_DIR="$YARN_HOME/logs"
 fi
-mkdir -p "$YARN_LOG_DIR"
-chown $YARN_IDENT_STRING $YARN_LOG_DIR 
+
+if [ ! -w "$YARN_LOG_DIR" ] ; then
+  mkdir -p "$YARN_LOG_DIR"
+  chown $YARN_IDENT_STRING $YARN_LOG_DIR 
+fi
 
 if [ "$YARN_PID_DIR" = "" ]; then
   YARN_PID_DIR=/tmp
@@ -101,7 +104,7 @@ case $startStop in
 
   (start)
 
-    mkdir -p "$YARN_PID_DIR"
+    [ -w "$YARN_PID_DIR" ] || mkdir -p "$YARN_PID_DIR"
 
     if [ -f $pid ]; then
       if kill -0 `cat $pid` > /dev/null 2>&1; then

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java Sat Feb  4 03:40:45 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.logaggreg
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.commons.cli.CommandLine;
@@ -103,14 +104,15 @@ public class LogDumper extends Configure
     if (appOwner == null || appOwner.isEmpty()) {
       appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
     }
+    int resultCode = 0;
     if (containerIdStr == null && nodeAddress == null) {
-      dumpAllContainersLogs(appId, appOwner, out);
+      resultCode = dumpAllContainersLogs(appId, appOwner, out);
     } else if ((containerIdStr == null && nodeAddress != null)
         || (containerIdStr != null && nodeAddress == null)) {
       System.out.println("ContainerId or NodeAddress cannot be null!");
       HelpFormatter formatter = new HelpFormatter();
       formatter.printHelp("general options are: ", opts);
-      return -1;
+      resultCode = -1;
     } else {
       Path remoteRootLogDir =
         new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@@ -122,27 +124,33 @@ public class LogDumper extends Configure
                   appId,
                   appOwner,
                   ConverterUtils.toNodeId(nodeAddress),
-                  getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
-                      YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)));
-      return dumpAContainerLogs(containerIdStr, reader, out);
+                  LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
+      resultCode = dumpAContainerLogs(containerIdStr, reader, out);
     }
 
-    return 0;
+    return resultCode;
   }
 
-  public void dumpAContainersLogs(String appId, String containerId,
+  public int dumpAContainersLogs(String appId, String containerId,
       String nodeId, String jobOwner) throws IOException {
     Path remoteRootLogDir =
         new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
     String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
-    AggregatedLogFormat.LogReader reader =
-        new AggregatedLogFormat.LogReader(getConf(),
-            LogAggregationUtils.getRemoteNodeLogFileForApp(remoteRootLogDir,
-                ConverterUtils.toApplicationId(appId), jobOwner,
-                ConverterUtils.toNodeId(nodeId), suffix));
+    Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp(
+        remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner,
+        ConverterUtils.toNodeId(nodeId), suffix);
+    AggregatedLogFormat.LogReader reader;
+    try {
+      reader = new AggregatedLogFormat.LogReader(getConf(), logPath);
+    } catch (FileNotFoundException fnfe) {
+      System.out.println("Logs not available at " + logPath.toString());
+      System.out.println(
+          "Log aggregation has not completed or is not enabled.");
+      return -1;
+    }
     DataOutputStream out = new DataOutputStream(System.out);
-    dumpAContainerLogs(containerId, reader, out);
+    return dumpAContainerLogs(containerId, reader, out);
   }
 
   private int dumpAContainerLogs(String containerIdStr,
@@ -174,21 +182,28 @@ public class LogDumper extends Configure
     return 0;
   }
 
-  private void dumpAllContainersLogs(ApplicationId appId, String appOwner,
+  private int dumpAllContainersLogs(ApplicationId appId, String appOwner,
       DataOutputStream out) throws IOException {
     Path remoteRootLogDir =
         new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
     String user = appOwner;
     String logDirSuffix =
-        getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+        LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
     //TODO Change this to get a list of files from the LAS.
     Path remoteAppLogDir =
         LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user,
             logDirSuffix);
-    RemoteIterator<FileStatus> nodeFiles =
-        FileContext.getFileContext().listStatus(remoteAppLogDir);
+    RemoteIterator<FileStatus> nodeFiles;
+    try {
+      nodeFiles = FileContext.getFileContext().listStatus(remoteAppLogDir);
+    } catch (FileNotFoundException fnf) {
+      System.out.println("Logs not available at "
+          + remoteAppLogDir.toString());
+      System.out.println(
+          "Log aggregation has not completed or is not enabled.");
+      return -1;
+    }
     while (nodeFiles.hasNext()) {
       FileStatus thisNodeFile = nodeFiles.next();
       AggregatedLogFormat.LogReader reader =
@@ -217,12 +232,14 @@ public class LogDumper extends Configure
         reader.close();
       }
     }
+    return 0;
   }
 
   public static void main(String[] args) throws Exception {
     Configuration conf = new YarnConfiguration();
     LogDumper logDumper = new LogDumper();
     logDumper.setConf(conf);
-    logDumper.run(args);
+    int exitCode = logDumper.run(args);
+    System.exit(exitCode);
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java Sat Feb  4 03:40:45 2012
@@ -38,7 +38,7 @@ public class ClusterMetrics {
   
   private static AtomicBoolean isInitialized = new AtomicBoolean(false);
   
-  @Metric("# of active NMs") MutableGaugeInt numNMs;
+  @Metric("# of active NMs") MutableGaugeInt numActiveNMs;
   @Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
   @Metric("# of lost NMs") MutableGaugeInt numLostNMs;
   @Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@@ -74,7 +74,7 @@ public class ClusterMetrics {
   
   //Active Nodemanagers
   public int getNumActiveNMs() {
-    return numNMs.value();
+    return numActiveNMs.value();
   }
   
   //Decommisioned NMs
@@ -128,17 +128,12 @@ public class ClusterMetrics {
   public void decrNumRebootedNMs() {
     numRebootedNMs.decr();
   }
-  
-  public void removeNode(RMNodeEventType nodeEventType) {
-    numNMs.decr();
-    switch(nodeEventType){
-    case DECOMMISSION: incrDecommisionedNMs(); break;
-    case EXPIRE: incrNumLostNMs();break;
-    case REBOOTING: incrNumRebootedNMs();break;
-    }
+
+  public void incrNumActiveNodes() {
+    numActiveNMs.incr();
   }
-  
-  public void addNode() {
-    numNMs.incr();
+
+  public void decrNumActiveNodes() {
+    numActiveNMs.decr();
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Sat Feb  4 03:40:45 2012
@@ -111,4 +111,12 @@ public class Resources {
   public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
     return lhs.getMemory() >= rhs.getMemory();
   }
+  
+  public static Resource min(Resource lhs, Resource rhs) {
+    return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs;
+  }
+
+  public static Resource max(Resource lhs, Resource rhs) {
+    return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs;
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Sat Feb  4 03:40:45 2012
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.util.Build
  */
 @Private
 @Unstable
+@SuppressWarnings("unchecked")
 public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
   private static final Log LOG = LogFactory.getLog(RMNodeImpl.class);
@@ -116,11 +117,14 @@ public class RMNodeImpl implements RMNod
          EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
      .addTransition(RMNodeState.RUNNING, RMNodeState.DECOMMISSIONED,
-         RMNodeEventType.DECOMMISSION, new RemoveNodeTransition())
+         RMNodeEventType.DECOMMISSION,
+         new DeactivateNodeTransition(RMNodeState.DECOMMISSIONED))
      .addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
-         RMNodeEventType.EXPIRE, new RemoveNodeTransition())
+         RMNodeEventType.EXPIRE,
+         new DeactivateNodeTransition(RMNodeState.LOST))
      .addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED,
-         RMNodeEventType.REBOOTING, new RemoveNodeTransition())
+         RMNodeEventType.REBOOTING,
+         new DeactivateNodeTransition(RMNodeState.REBOOTED))
      .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
      .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
@@ -304,26 +308,50 @@ public class RMNodeImpl implements RMNod
       writeLock.unlock();
     }
   }
-  
+
+  private void updateMetricsForRejoinedNode(RMNodeState previousNodeState) {
+    ClusterMetrics metrics = ClusterMetrics.getMetrics();
+    metrics.incrNumActiveNodes();
+
+    switch (previousNodeState) {
+    case LOST:
+      metrics.decrNumLostNMs();
+      break;
+    case REBOOTED:
+      metrics.decrNumRebootedNMs();
+      break;
+    case DECOMMISSIONED:
+      metrics.decrDecommisionedNMs();
+      break;
+    case UNHEALTHY:
+      metrics.decrNumUnhealthyNMs();
+      break;
+    }
+  }
+
+  private void updateMetricsForDeactivatedNode(RMNodeState finalState) {
+    ClusterMetrics metrics = ClusterMetrics.getMetrics();
+    metrics.decrNumActiveNodes();
+
+    switch (finalState) {
+    case DECOMMISSIONED:
+      metrics.incrDecommisionedNMs();
+      break;
+    case LOST:
+      metrics.incrNumLostNMs();
+      break;
+    case REBOOTED:
+      metrics.incrNumRebootedNMs();
+      break;
+    case UNHEALTHY:
+      metrics.incrNumUnhealthyNMs();
+      break;
+    }
+  }
+
   public static class AddNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
-	  
-    private void updateMetrics(RMNodeState nodeState) {
-      ClusterMetrics metrics = ClusterMetrics.getMetrics();
-      switch (nodeState) {
-      case LOST:
-        metrics.decrNumLostNMs();
-        break;
-      case REBOOTED:
-        metrics.decrNumRebootedNMs();
-        break;
-      case DECOMMISSIONED:
-        metrics.decrDecommisionedNMs();
-        break;
-      }
-    }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
@@ -333,12 +361,14 @@ public class RMNodeImpl implements RMNod
       
       String host = rmNode.nodeId.getHost();
       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
-        RMNode node = rmNode.context.getInactiveRMNodes().get(host);
+        // Old node rejoining
+        RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host);
         rmNode.context.getInactiveRMNodes().remove(host);
-        updateMetrics(node.getState());
+        rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
+      } else {
+        // Increment activeNodes explicitly because this is a new node.
+        ClusterMetrics.getMetrics().incrNumActiveNodes();
       }
-
-      ClusterMetrics.getMetrics().addNode();
     }
   }
   
@@ -362,28 +392,33 @@ public class RMNodeImpl implements RMNod
     }
   }
 
-  public static class RemoveNodeTransition
+  public static class DeactivateNodeTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
-    @SuppressWarnings("unchecked")
+    private final RMNodeState finalState;
+    public DeactivateNodeTransition(RMNodeState finalState) {
+      this.finalState = finalState;
+    }
+
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeRemovedSchedulerEvent(rmNode));
 
-      // Remove the node from the system.
+      // Deactivate the node
       rmNode.context.getRMNodes().remove(rmNode.nodeId);
-      LOG.info("Removed Node " + rmNode.nodeId);
+      LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+          + finalState);
       rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
-      //Update the metrics 
-      ClusterMetrics.getMetrics().removeNode(event.getType());
+
+      //Update the metrics
+      rmNode.updateMetricsForDeactivatedNode(finalState);
     }
   }
 
   public static class StatusUpdateWhenHealthyTransition implements
       MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
-    @SuppressWarnings("unchecked")
     @Override
     public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
 
@@ -399,7 +434,8 @@ public class RMNodeImpl implements RMNod
         // Inform the scheduler
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeRemovedSchedulerEvent(rmNode));
-        ClusterMetrics.getMetrics().incrNumUnhealthyNMs();
+        // Update metrics
+        rmNode.updateMetricsForDeactivatedNode(RMNodeState.UNHEALTHY);
         return RMNodeState.UNHEALTHY;
       }
 
@@ -458,11 +494,9 @@ public class RMNodeImpl implements RMNod
     }
   }
 
-  public static class StatusUpdateWhenUnHealthyTransition
- implements
+  public static class StatusUpdateWhenUnHealthyTransition implements
       MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
 
-    @SuppressWarnings("unchecked")
     @Override
     public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
@@ -474,7 +508,8 @@ public class RMNodeImpl implements RMNod
       if (remoteNodeHealthStatus.getIsNodeHealthy()) {
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeAddedSchedulerEvent(rmNode));
-        ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+        // Update metrics
+        rmNode.updateMetricsForRejoinedNode(RMNodeState.UNHEALTHY);
         return RMNodeState.RUNNING;
       }
 

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Sat Feb  4 03:40:45 2012
@@ -162,6 +162,13 @@ public class AppSchedulingInfo {
 
       asks.put(hostName, request);
       if (updatePendingResources) {
+        
+        // Similarly, deactivate application?
+        if (request.getNumContainers() <= 0) {
+          LOG.info("checking for deactivate... ");
+          checkForDeactivation();
+        }
+        
         int lastRequestContainers = lastRequest != null ? lastRequest
             .getNumContainers() : 0;
         Resource lastRequestCapability = lastRequest != null ? lastRequest
@@ -308,19 +315,24 @@ public class AppSchedulingInfo {
     // Do we have any outstanding requests?
     // If there is nothing, we need to deactivate this application
     if (numOffSwitchContainers == 0) {
-      boolean deactivate = true;
-      for (Priority priority : getPriorities()) {
-        ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
-        if (request.getNumContainers() > 0) {
-          deactivate = false;
-          break;
-        }
-      }
-      if (deactivate) {
-        activeUsersManager.deactivateApplication(user, applicationId);
+      checkForDeactivation();
+    }
+  }
+  
+  synchronized private void checkForDeactivation() {
+    boolean deactivate = true;
+    for (Priority priority : getPriorities()) {
+      ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
+      if (request.getNumContainers() > 0) {
+        deactivate = false;
+        break;
       }
     }
+    if (deactivate) {
+      activeUsersManager.deactivateApplication(user, applicationId);
+    }
   }
+  
   synchronized private void allocate(Container container) {
     // Update consumption and track allocations
     //TODO: fixme sharad

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Sat Feb  4 03:40:45 2012
@@ -46,17 +46,23 @@ class CSQueueUtils {
   }
 
   public static int computeMaxActiveApplications(Resource clusterResource,
-      float maxAMResourcePercent, float absoluteCapacity) {
+      Resource minimumAllocation, float maxAMResourcePercent, 
+      float absoluteMaxCapacity) {
     return 
         Math.max(
-            (int)((clusterResource.getMemory() / (float)LeafQueue.DEFAULT_AM_RESOURCE) * 
-                   maxAMResourcePercent * absoluteCapacity), 
+            (int)Math.ceil(
+                     ((float)clusterResource.getMemory() / 
+                         minimumAllocation.getMemory()) * 
+                     maxAMResourcePercent * absoluteMaxCapacity), 
             1);
   }
 
   public static int computeMaxActiveApplicationsPerUser(
       int maxActiveApplications, int userLimit, float userLimitFactor) {
-    return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor);
+    return Math.max(
+        (int)Math.ceil(
+            maxActiveApplications * (userLimit / 100.0f) * userLimitFactor),
+        1);
   }
   
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Sat Feb  4 03:40:45 2012
@@ -125,8 +125,6 @@ public class LeafQueue implements CSQueu
   
   private final ActiveUsersManager activeUsersManager;
   
-  final static int DEFAULT_AM_RESOURCE = 2 * 1024;
-  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, 
       Comparator<SchedulerApp> applicationComparator, CSQueue old) {
@@ -166,8 +164,9 @@ public class LeafQueue implements CSQueu
     this.maxAMResourcePercent = 
         cs.getConfiguration().getMaximumApplicationMasterResourcePercent();
     int maxActiveApplications = 
-        CSQueueUtils.computeMaxActiveApplications(cs.getClusterResources(), 
-            maxAMResourcePercent, absoluteCapacity);
+        CSQueueUtils.computeMaxActiveApplications(
+            cs.getClusterResources(), this.minimumAllocation,
+            maxAMResourcePercent, absoluteMaxCapacity);
     int maxActiveApplicationsPerUser = 
         CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, 
             userLimitFactor);
@@ -246,30 +245,39 @@ public class LeafQueue implements CSQueu
         " [= configuredMaxCapacity ]" + "\n" +
         "absoluteMaxCapacity = " + absoluteMaxCapacity +
         " [= 1.0 maximumCapacity undefined, " +
-        "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + "\n" +
+        "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + 
+        "\n" +
         "userLimit = " + userLimit +
         " [= configuredUserLimit ]" + "\n" +
         "userLimitFactor = " + userLimitFactor +
         " [= configuredUserLimitFactor ]" + "\n" +
         "maxApplications = " + maxApplications +
-        " [= (int)(configuredMaximumSystemApplications * absoluteCapacity) ]" + "\n" +
+        " [= (int)(configuredMaximumSystemApplications * absoluteCapacity) ]" + 
+        "\n" +
         "maxApplicationsPerUser = " + maxApplicationsPerUser +
-        " [= (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor) ]" + "\n" +
+        " [= (int)(maxApplications * (userLimit / 100.0f) * " +
+        "userLimitFactor) ]" + "\n" +
         "maxActiveApplications = " + maxActiveApplications +
         " [= max(" + 
-        "(int)((clusterResourceMemory / (float)DEFAULT_AM_RESOURCE) *" + 
-        "maxAMResourcePercent * absoluteCapacity)," + 
+        "(int)ceil((clusterResourceMemory / minimumAllocation) *" + 
+        "maxAMResourcePercent * absoluteMaxCapacity)," + 
         "1) ]" + "\n" +
         "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser +
-        " [= (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor) ]" + "\n" +
+        " [= max(" +
+        "(int)(maxActiveApplications * (userLimit / 100.0f) * " +
+        "userLimitFactor)," +
+        "1) ]" + "\n" +
         "utilization = " + utilization +
-        " [= usedResourcesMemory /  (clusterResourceMemory * absoluteCapacity)]" + "\n" +
+        " [= usedResourcesMemory / " +
+        "(clusterResourceMemory * absoluteCapacity)]" + "\n" +
         "usedCapacity = " + usedCapacity +
-        " [= usedResourcesMemory / (clusterResourceMemory * parent.absoluteCapacity)]" + "\n" +
+        " [= usedResourcesMemory / " +
+        "(clusterResourceMemory * parent.absoluteCapacity)]" + "\n" +
         "maxAMResourcePercent = " + maxAMResourcePercent +
         " [= configuredMaximumAMResourcePercent ]" + "\n" +
         "minimumAllocationFactor = " + minimumAllocationFactor +
-        " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / maximumAllocationMemory ]" + "\n" +
+        " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
+        "maximumAllocationMemory ]" + "\n" +
         "numContainers = " + numContainers +
         " [= currentNumContainers ]" + "\n" +
         "state = " + state +
@@ -606,7 +614,10 @@ public class LeafQueue implements CSQueu
       addApplication(application, user);
     }
 
-    metrics.submitApp(userName);
+    int attemptId = application.getApplicationAttemptId().getAttemptId();
+    if (attemptId == 1) {
+      metrics.submitApp(userName);
+    }
 
     // Inform the parent queue
     try {
@@ -635,7 +646,7 @@ public class LeafQueue implements CSQueu
         user.activateApplication();
         activeApplications.add(application);
         i.remove();
-        LOG.info("Application " + application.getApplicationId().getId() + 
+        LOG.info("Application " + application.getApplicationId() +
             " from user: " + application.getUser() + 
             " activated in queue: " + getQueueName());
       }
@@ -673,10 +684,13 @@ public class LeafQueue implements CSQueu
   }
 
   public synchronized void removeApplication(SchedulerApp application, User user) {
-    activeApplications.remove(application);
+    boolean wasActive = activeApplications.remove(application);
+    if (!wasActive) {
+      pendingApplications.remove(application);
+    }
     applicationsMap.remove(application.getApplicationAttemptId());
 
-    user.finishApplication();
+    user.finishApplication(wasActive);
     if (user.getTotalApplications() == 0) {
       users.remove(application.getUser());
     }
@@ -751,15 +765,15 @@ public class LeafQueue implements CSQueu
             continue;
           }
 
-          // Compute & set headroom
-          // Note: We set the headroom with the highest priority request 
-          //       as the target. 
+          // Compute user-limit & set headroom
+          // Note: We compute both user-limit & headroom with the highest 
+          //       priority request as the target. 
           //       This works since we never assign lower priority requests
           //       before all higher priority ones are serviced.
           Resource userLimit = 
-              computeAndSetUserResourceLimit(application, clusterResource, 
-                  required);
-
+              computeUserLimitAndSetHeadroom(application, clusterResource, 
+                  required);          
+          
           // Check queue max-capacity limit
           if (!assignToQueue(clusterResource, required)) {
             return NULL_ASSIGNMENT;
@@ -777,13 +791,13 @@ public class LeafQueue implements CSQueu
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
                 null);
-          
-          Resource assigned = assignment.getResource();
-          
+
           // Did we schedule or reserve a container?
+          Resource assigned = assignment.getResource();
           if (Resources.greaterThan(assigned, Resources.none())) {
 
-            // Book-keeping
+            // Book-keeping 
+            // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned);
             
             // Reset scheduling opportunities
@@ -854,20 +868,50 @@ public class LeafQueue implements CSQueu
   }
 
   @Lock({LeafQueue.class, SchedulerApp.class})
-  private Resource computeAndSetUserResourceLimit(SchedulerApp application, 
-      Resource clusterResource, Resource required) {
+  private Resource computeUserLimitAndSetHeadroom(
+      SchedulerApp application, Resource clusterResource, Resource required) {
+    
     String user = application.getUser();
-    Resource limit = computeUserLimit(application, clusterResource, required);
+    
+    /** 
+     * Headroom is min((userLimit, queue-max-cap) - consumed)
+     */
+
+    Resource userLimit =                          // User limit
+        computeUserLimit(application, clusterResource, required);
+    
+
+    Resource queueMaxCap =                        // Queue Max-Capacity
+        Resources.createResource(
+            roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory()))
+            );
+    
+    Resource userConsumed = getUser(user).getConsumedResources(); 
     Resource headroom = 
-        Resources.subtract(limit, getUser(user).getConsumedResources());
+        Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Headroom calculation for user " + user + ": " + 
+          " userLimit=" + userLimit + 
+          " queueMaxCap=" + queueMaxCap + 
+          " consumed=" + userConsumed + 
+          " headroom=" + headroom);
+    }
+    
     application.setHeadroom(headroom);
     metrics.setAvailableResourcesToUser(user, headroom);
-    return limit;
+    
+    return userLimit;
   }
   
   private int roundUp(int memory) {
-    return divideAndCeil(memory, minimumAllocation.getMemory()) * 
-        minimumAllocation.getMemory();
+    int minMemory = minimumAllocation.getMemory();
+    return divideAndCeil(memory, minMemory) * minMemory; 
+  }
+  
+  private int roundDown(int memory) {
+    int minMemory = minimumAllocation.getMemory();
+    return (memory / minMemory) * minMemory;
   }
   
   @Lock(NoLock.class)
@@ -1288,10 +1332,17 @@ public class LeafQueue implements CSQueu
     String userName = application.getUser();
     User user = getUser(userName);
     user.assignContainer(resource);
+    Resources.subtractFrom(application.getHeadroom(), resource); // headroom
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
-    LOG.info(getQueueName() + 
-        " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " user-resources=" + user.getConsumedResources());
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.info(getQueueName() + 
+          " user=" + userName + 
+          " used=" + usedResources + " numContainers=" + numContainers +
+          " headroom = " + application.getHeadroom() +
+          " user-resources=" + user.getConsumedResources()
+          );
+    }
   }
 
   synchronized void releaseResource(Resource clusterResource, 
@@ -1316,17 +1367,18 @@ public class LeafQueue implements CSQueu
   public synchronized void updateClusterResource(Resource clusterResource) {
     // Update queue properties
     maxActiveApplications = 
-        CSQueueUtils.computeMaxActiveApplications(clusterResource, maxAMResourcePercent, 
-            absoluteCapacity);
+        CSQueueUtils.computeMaxActiveApplications(
+            clusterResource, minimumAllocation, 
+            maxAMResourcePercent, absoluteMaxCapacity);
     maxActiveApplicationsPerUser = 
-        CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, 
-            userLimitFactor);
+        CSQueueUtils.computeMaxActiveApplicationsPerUser(
+            maxActiveApplications, userLimit, userLimitFactor);
     
     // Update application properties
     for (SchedulerApp application : activeApplications) {
       synchronized (application) {
-        computeAndSetUserResourceLimit(
-            application, clusterResource, Resources.none());
+        computeUserLimitAndSetHeadroom(application, clusterResource, 
+            Resources.none());
       }
     }
   }
@@ -1378,8 +1430,13 @@ public class LeafQueue implements CSQueu
       ++activeApplications;
     }
 
-    public synchronized void finishApplication() {
-      --activeApplications;
+    public synchronized void finishApplication(boolean wasActive) {
+      if (wasActive) {
+        --activeApplications;
+      }
+      else {
+        --pendingApplications;
+      }
     }
 
     public synchronized void assignContainer(Resource resource) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sat Feb  4 03:40:45 2012
@@ -298,7 +298,9 @@ public class FifoScheduler implements Re
         new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
             this.rmContext, null);
     applications.put(appAttemptId, schedulerApp);
-    metrics.submitApp(user);
+    if (appAttemptId.getAttemptId() == 1) {
+      metrics.submitApp(user);
+    }
     LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
         " from " + user + ", currently active: " + applications.size());
     rmContext.getDispatcher().getEventHandler().handle(

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Sat Feb  4 03:40:45 2012
@@ -100,6 +100,12 @@ class NodesPage extends RmView {
           if(!stateFilter.equals(state)) {
             continue;
           }
+        } else {
+          // No filter. User is asking for all nodes. Make sure you skip the
+          // unhealthy nodes.
+          if (ni.getState() == RMNodeState.UNHEALTHY) {
+            continue;
+          }
         }
         NodeInfo info = new NodeInfo(ni, sched);
         int usedMemory = (int)info.getUsedMemory();

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Sat Feb  4 03:40:45 2012
@@ -166,6 +166,12 @@ public class RMWebServices {
         if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) {
           continue;
         }
+      } else {
+        // No filter. User is asking for all nodes. Make sure you skip the
+        // unhealthy nodes.
+        if (ni.getState() == RMNodeState.UNHEALTHY) {
+          continue;
+        }
       }
       if ((healthState != null) && (!healthState.isEmpty())) {
         LOG.info("heatlh state is : " + healthState);

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Sat Feb  4 03:40:45 2012
@@ -51,18 +51,23 @@ public class MockNodes {
     List<RMNode> list = Lists.newArrayList();
     for (int i = 0; i < racks; ++i) {
       for (int j = 0; j < nodesPerRack; ++j) {
+        if (j == (nodesPerRack - 1)) {
+          // One unhealthy node per rack.
+          list.add(nodeInfo(i, perNode, RMNodeState.UNHEALTHY));
+        }
         list.add(newNodeInfo(i, perNode));
       }
     }
     return list;
   }
   
-  public static List<RMNode> lostNodes(int racks, int nodesPerRack,
+  public static List<RMNode> deactivatedNodes(int racks, int nodesPerRack,
       Resource perNode) {
     List<RMNode> list = Lists.newArrayList();
     for (int i = 0; i < racks; ++i) {
       for (int j = 0; j < nodesPerRack; ++j) {
-        list.add(lostNodeInfo(i, perNode, RMNodeState.LOST));
+        RMNodeState[] allStates = RMNodeState.values();
+        list.add(nodeInfo(i, perNode, allStates[j % allStates.length]));
       }
     }
     return list;
@@ -198,15 +203,20 @@ public class MockNodes {
     final String httpAddress = httpAddr;
     final NodeHealthStatus nodeHealthStatus =
         recordFactory.newRecordInstance(NodeHealthStatus.class);
+    if (state != RMNodeState.UNHEALTHY) {
+      nodeHealthStatus.setIsNodeHealthy(true);
+      nodeHealthStatus.setHealthReport("HealthyMe");
+    }
     return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName,
         nodeHealthStatus, nid, hostName, state); 
   }
 
-  public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) {
+  public static RMNode nodeInfo(int rack, final Resource perNode,
+      RMNodeState state) {
     return buildRMNode(rack, perNode, state, "N/A");
   }
 
   public static RMNode newNodeInfo(int rack, final Resource perNode) {
-    return buildRMNode(rack, perNode, null, "localhost:0");
+    return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0");
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Sat Feb  4 03:40:45 2012
@@ -153,29 +153,31 @@ public class TestApplicationLimits {
     		queue.getMaximumActiveApplicationsPerUser());
     int expectedMaxActiveApps = 
         Math.max(1, 
-            (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * 
+            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
                    csConf.getMaximumApplicationMasterResourcePercent() *
-                   queue.getAbsoluteCapacity()));
+                   queue.getAbsoluteMaximumCapacity()));
     assertEquals(expectedMaxActiveApps, 
                  queue.getMaximumActiveApplications());
-    assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * 
-                       queue.getUserLimitFactor()), 
-                 queue.getMaximumActiveApplicationsPerUser());
+    assertEquals(
+        (int)Math.ceil(
+            expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * 
+            queue.getUserLimitFactor()), 
+        queue.getMaximumActiveApplicationsPerUser());
     
     // Add some nodes to the cluster & test new limits
     clusterResource = Resources.createResource(120 * 16 * GB);
     root.updateClusterResource(clusterResource);
     expectedMaxActiveApps = 
         Math.max(1, 
-            (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * 
+            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
                    csConf.getMaximumApplicationMasterResourcePercent() *
-                   queue.getAbsoluteCapacity()));
+                   queue.getAbsoluteMaximumCapacity()));
     assertEquals(expectedMaxActiveApps, 
                  queue.getMaximumActiveApplications());
-    assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * 
-                       queue.getUserLimitFactor()), 
-                 queue.getMaximumActiveApplicationsPerUser());
-    
+    assertEquals(
+        (int)Math.ceil(expectedMaxActiveApps * 
+            (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()), 
+        queue.getMaximumActiveApplicationsPerUser());
   }
   
   @Test
@@ -257,6 +259,87 @@ public class TestApplicationLimits {
   }
 
   @Test
+  public void testActiveLimitsWithKilledApps() throws Exception {
+    final String user_0 = "user_0";
+
+    int APPLICATION_ID = 0;
+
+    // set max active to 2
+    doReturn(2).when(queue).getMaximumActiveApplications();
+
+    // Submit first application
+    SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_0, user_0, A);
+    assertEquals(1, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    assertTrue(queue.activeApplications.contains(app_0));
+
+    // Submit second application
+    SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_1, user_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    assertTrue(queue.activeApplications.contains(app_1));
+
+    // Submit third application, should remain pending
+    SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_2, user_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(1, queue.getNumPendingApplications(user_0));
+    assertTrue(queue.pendingApplications.contains(app_2));
+
+    // Submit fourth application, should remain pending
+    SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_3, user_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(2, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(2, queue.getNumPendingApplications(user_0));
+    assertTrue(queue.pendingApplications.contains(app_3));
+
+    // Kill 3rd pending application
+    queue.finishApplication(app_2, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(1, queue.getNumPendingApplications(user_0));
+    assertFalse(queue.pendingApplications.contains(app_2));
+    assertFalse(queue.activeApplications.contains(app_2));
+
+    // Finish 1st application, app_3 should become active
+    queue.finishApplication(app_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    assertTrue(queue.activeApplications.contains(app_3));
+    assertFalse(queue.pendingApplications.contains(app_3));
+    assertFalse(queue.activeApplications.contains(app_0));
+
+    // Finish 2nd application
+    queue.finishApplication(app_1, A);
+    assertEquals(1, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    assertFalse(queue.activeApplications.contains(app_1));
+
+    // Finish 4th application
+    queue.finishApplication(app_3, A);
+    assertEquals(0, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(0, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    assertFalse(queue.activeApplications.contains(app_3));
+  }
+
+  @Test
   public void testHeadroom() throws Exception {
     CapacitySchedulerConfiguration csConf = 
         new CapacitySchedulerConfiguration();

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Sat Feb  4 03:40:45 2012
@@ -38,6 +38,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -57,6 +59,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.junit.After;
@@ -67,6 +70,8 @@ import org.mockito.stubbing.Answer;
 
 public class TestLeafQueue {
   
+  private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
+  
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
 
@@ -88,6 +93,7 @@ public class TestLeafQueue {
     
     csConf = 
         new CapacitySchedulerConfiguration();
+    csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
     setupQueueConfiguration(csConf);
     
     
@@ -254,6 +260,35 @@ public class TestLeafQueue {
     assertEquals(7*GB, a.getMetrics().getAvailableMB());
   }
 
+  @Test
+  public void testAppAttemptMetrics() throws Exception {
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
+
+    // Users
+    final String user_0 = "user_0";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = TestUtils
+        .getMockApplicationAttemptId(0, 1);
+    SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null,
+        rmContext, null);
+    a.submitApplication(app_0, user_0, B);
+
+    // Attempt the same application again
+    final ApplicationAttemptId appAttemptId_1 = TestUtils
+        .getMockApplicationAttemptId(0, 2);
+    SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null,
+        rmContext, null);
+    a.submitApplication(app_1, user_0, B); // same user
+
+    assertEquals(1, a.getMetrics().getAppsSubmitted());
+    assertEquals(1, a.getMetrics().getAppsPending());
+
+    QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0);
+    assertEquals(1, userMetrics.getAppsSubmitted());
+  }
 
   @Test
   public void testSingleQueueWithOneUser() throws Exception {
@@ -473,6 +508,115 @@ public class TestLeafQueue {
   }
   
   @Test
+  public void testHeadroomWithMaxCap() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    //unset maxCapacity
+    a.setMaxCapacity(1.0f);
+    
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    SchedulerApp app_1 = 
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_1, user_0, A);  // same user
+
+    final ApplicationAttemptId appAttemptId_2 = 
+        TestUtils.getMockApplicationAttemptId(2, 0); 
+    SchedulerApp app_2 = 
+        new SchedulerApp(appAttemptId_2, user_1, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_2, user_1, A);
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    String host_1 = "host_1";
+    SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+    
+    final int numNodes = 2;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ 
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
+                recordFactory))); 
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+            recordFactory))); 
+
+    /**
+     * Start testing...
+     */
+    
+    // Set user-limit
+    a.setUserLimit(50);
+    a.setUserLimitFactor(2);
+    
+    // Now, only user_0 should be active since he is the only one with
+    // outstanding requests
+    assertEquals("There should only be 1 active user!", 
+        1, a.getActiveUsersManager().getNumActiveUsers());
+
+    // 1 container to user_0
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
+
+    // Again one to user_0 since he hasn't exceeded user limit yet
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
+    
+    // Submit requests for app_1 and set max-cap
+    a.setMaxCapacity(.1f);
+    app_2.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
+            recordFactory))); 
+    assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
+
+    // No more to user_0 since he is already over user-limit
+    // and no more containers to queue since it's already at max-cap
+    a.assignContainers(clusterResource, node_1);
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_0.getHeadroom().getMemory());
+    assertEquals(0*GB, app_1.getHeadroom().getMemory());
+    
+    // Check headroom for app_2 
+    LOG.info("here");
+    app_1.updateResourceRequests(Collections.singletonList(     // unset
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority,
+            recordFactory))); 
+    assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
+    a.assignContainers(clusterResource, node_1);
+    assertEquals(1*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
+  }
+
+  @Test
   public void testSingleQueueWithMultipleUsers() throws Exception {
     
     // Mock the queue

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Sat Feb  4 03:40:45 2012
@@ -86,7 +86,7 @@ public class TestParentQueue {
   private SchedulerApp getMockApplication(int appId, String user) {
     SchedulerApp application = mock(SchedulerApp.class);
     doReturn(user).when(application).getUser();
-    doReturn(null).when(application).getHeadroom();
+    doReturn(Resources.createResource(0)).when(application).getHeadroom();
     return application;
   }
 

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1240450&r1=1240449&r2=1240450&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Sat Feb  4 03:40:45 2012
@@ -26,17 +26,28 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 
 public class TestFifoScheduler {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -63,7 +74,30 @@ public class TestFifoScheduler {
             .getRMContext());
   }
   
+  @Test
+  public void testAppAttemptMetrics() throws Exception {
+    AsyncDispatcher dispatcher = new InlineDispatcher();
+    RMContext rmContext = new RMContextImpl(null, dispatcher, null, null, null);
 
+    FifoScheduler schedular = new FifoScheduler();
+    schedular.reinitialize(new Configuration(), null, rmContext);
+
+    ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
+
+    SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "queue",
+        "user");
+    schedular.handle(event);
+
+    appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
+
+    event = new AppAddedSchedulerEvent(appAttemptId, "queue", "user");
+    schedular.handle(event);
+
+    QueueMetrics metrics = schedular.getRootQueueMetrics();
+    Assert.assertEquals(1, metrics.getAppsSubmitted());
+  }
 
 //  @Test
   public void testFifoScheduler() throws Exception {



Mime
View raw message