hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1537330 [8/12] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java...
Date Wed, 30 Oct 2013 22:22:36 GMT
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Oct 30 22:21:59 2013
@@ -37,7 +37,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -154,8 +154,9 @@ public class FairScheduler implements Re
 
   // This stores per-application scheduling information, indexed by
   // attempt ID's for fast lookup.
+  @VisibleForTesting
   protected Map<ApplicationAttemptId, FSSchedulerApp> applications = 
-      new HashMap<ApplicationAttemptId, FSSchedulerApp>();
+      new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
 
   // Nodes in the cluster, indexed by NodeId
   private Map<NodeId, FSSchedulerNode> nodes = 
@@ -178,8 +179,12 @@ public class FairScheduler implements Re
   protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
+  protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
+  protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
   protected double nodeLocalityThreshold; // Cluster threshold for node locality
   protected double rackLocalityThreshold; // Cluster threshold for rack locality
+  protected long nodeLocalityDelayMs; // Delay for node locality
+  protected long rackLocalityDelayMs; // Delay for rack locality
   private FairSchedulerEventLog eventLog; // Machine-readable event log
   protected boolean assignMultiple; // Allocate multiple containers per
                                     // heartbeat
@@ -444,7 +449,7 @@ public class FairScheduler implements Re
       // proceed with kill
       if (time + waitTimeBeforeKill < clock.getTime()) {
         ContainerStatus status =
-          SchedulerUtils.createAbnormalContainerStatus(
+          SchedulerUtils.createPreemptedContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
         // TODO: Not sure if this ever actually adds this to the list of cleanup
@@ -581,6 +586,22 @@ public class FairScheduler implements Re
     return rackLocalityThreshold;
   }
 
+  public long getNodeLocalityDelayMs() {
+    return nodeLocalityDelayMs;
+  }
+
+  public long getRackLocalityDelayMs() {
+    return rackLocalityDelayMs;
+  }
+
+  public boolean isContinuousSchedulingEnabled() {
+    return continuousSchedulingEnabled;
+  }
+
+  public synchronized int getContinuousSchedulingSleepMs() {
+    return continuousSchedulingSleepMs;
+  }
+
   public Resource getClusterCapacity() {
     return clusterCapacity;
   }
@@ -613,7 +634,8 @@ public class FairScheduler implements Re
       return;
     }
 
-    RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId);
+    RMApp rmApp = rmContext.getRMApps().get(
+        applicationAttemptId.getApplicationId());
     FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
 
     FSSchedulerApp schedulerApp =
@@ -623,7 +645,9 @@ public class FairScheduler implements Re
 
     // Enforce ACLs
     UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
-    if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) {
+
+    if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
+        && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
       String msg = "User " + userUgi.getUserName() +
     	        " cannot submit applications to queue " + queue.getName();
       LOG.info(msg);
@@ -631,7 +655,7 @@ public class FairScheduler implements Re
     	        new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
       return;
     }
-    
+
     queue.addApp(schedulerApp);
     queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
 
@@ -654,14 +678,17 @@ public class FairScheduler implements Re
       queueName = user;
     }
     
-    FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
+    FSLeafQueue queue = queueMgr.getLeafQueue(queueName,
+        conf.getAllowUndeclaredPools());
     if (queue == null) {
       // queue is not an existing or createable leaf queue
-      queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+      queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, false);
     }
     
     if (rmApp != null) {
       rmApp.setQueue(queue.getName());
+    } else {
+      LOG.warn("Couldn't find RM app to set queue name on");
     }
     
     return queue;
@@ -703,7 +730,7 @@ public class FairScheduler implements Re
 
     // Inform the queue
     FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
-        .getQueueName());
+        .getQueueName(), false);
     queue.removeApp(application);
 
     // Remove from our data-structure
@@ -761,6 +788,10 @@ public class FairScheduler implements Re
 
   private synchronized void removeNode(RMNode rmNode) {
     FSSchedulerNode node = nodes.get(rmNode.getNodeID());
+    // This can occur when an UNHEALTHY node reconnects
+    if (node == null) {
+      return;
+    }
     Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
     updateRootQueueMetrics();
 
@@ -851,6 +882,8 @@ public class FairScheduler implements Re
       for (RMContainer container : application.getPreemptionContainers()) {
         preemptionContainerIds.add(container.getContainerId());
       }
+
+      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
       
       return new Allocation(application.pullNewlyAllocatedContainers(),
           application.getHeadroom(), preemptionContainerIds);
@@ -904,6 +937,37 @@ public class FairScheduler implements Re
           completedContainer, RMContainerEventType.FINISHED);
     }
 
+    if (continuousSchedulingEnabled) {
+      if (!completedContainers.isEmpty()) {
+        attemptScheduling(node);
+      }
+    } else {
+      attemptScheduling(node);
+    }
+  }
+
+  private void continuousScheduling() {
+    while (true) {
+      for (FSSchedulerNode node : nodes.values()) {
+        try {
+          if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
+            attemptScheduling(node);
+          }
+        } catch (Throwable ex) {
+          LOG.warn("Error while attempting scheduling for node " + node + ": " +
+                  ex.toString(), ex);
+        }
+      }
+      try {
+        Thread.sleep(getContinuousSchedulingSleepMs());
+      } catch (InterruptedException e) {
+        LOG.warn("Error while doing sleep in continuous scheduling: " +
+                e.toString(), e);
+      }
+    }
+  }
+  
+  private synchronized void attemptScheduling(FSSchedulerNode node) {
     // Assign new containers...
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
@@ -911,19 +975,18 @@ public class FairScheduler implements Re
     AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
     if (reservedAppSchedulable != null) {
       Priority reservedPriority = node.getReservedContainer().getReservedPriority();
-      if (reservedAppSchedulable != null &&
-          !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
+      if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
         // Don't hold the reservation if app can no longer use it
         LOG.info("Releasing reservation that cannot be satisfied for application "
             + reservedAppSchedulable.getApp().getApplicationAttemptId()
-            + " on node " + nm);
+            + " on node " + node);
         reservedAppSchedulable.unreserve(reservedPriority, node);
         reservedAppSchedulable = null;
       } else {
         // Reservation exists; try to fulfill the reservation
         LOG.info("Trying to fulfill reservation for application "
             + reservedAppSchedulable.getApp().getApplicationAttemptId()
-            + " on node: " + nm);
+            + " on node: " + node);
 
         node.getReservedAppSchedulable().assignReservedContainer(node);
       }
@@ -1057,8 +1120,13 @@ public class FairScheduler implements Re
     maximumAllocation = this.conf.getMaximumAllocation();
     incrAllocation = this.conf.getIncrementAllocation();
     userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
+    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+    continuousSchedulingSleepMs =
+            this.conf.getContinuousSchedulingSleepMs();
     nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
     rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
     preemptionEnabled = this.conf.getPreemptionEnabled();
     assignMultiple = this.conf.getAssignMultiple();
     maxAssign = this.conf.getMaxAssign();
@@ -1085,6 +1153,21 @@ public class FairScheduler implements Re
       updateThread.setName("FairSchedulerUpdateThread");
       updateThread.setDaemon(true);
       updateThread.start();
+
+      if (continuousSchedulingEnabled) {
+        // start continuous scheduling thread
+        Thread schedulingThread = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              continuousScheduling();
+            }
+          }
+        );
+        schedulingThread.setName("ContinuousScheduling");
+        schedulingThread.setDaemon(true);
+        schedulingThread.start();
+      }
     } else {
       try {
         queueMgr.reloadAllocs();
@@ -1121,4 +1204,18 @@ public class FairScheduler implements Re
     return nodes.size();
   }
 
+  @Override
+  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+      QueueACL acl, String queueName) {
+    FSQueue queue = getQueueManager().getQueue(queueName);
+    if (queue == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ACL not found for queue access-type " + acl
+            + " for queue " + queueName);
+      }
+      return false;
+    }
+    return queue.hasAccess(acl, callerUGI);
+  }
+
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Wed Oct 30 22:21:59 2013
@@ -18,9 +18,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.File;
+import java.net.URL;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +36,9 @@ import org.apache.hadoop.yarn.util.resou
 @Evolving
 public class FairSchedulerConfiguration extends Configuration {
 
+  public static final Log LOG = LogFactory.getLog(
+      FairSchedulerConfiguration.class.getName());
+  
   /** Increment request grant-able by the RM scheduler. 
    * These properties are looked up in the yarn-site.xml  */
   public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB =
@@ -42,13 +48,17 @@ public class FairSchedulerConfiguration 
     YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
   public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;
   
-  public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
-
   private static final String CONF_PREFIX =  "yarn.scheduler.fair.";
 
-  protected static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
+  public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
+  protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml";
   protected static final String EVENT_LOG_DIR = "eventlog.dir";
 
+  /** Whether pools can be created that were not specified in the FS configuration file
+   */
+  protected static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX + "allow-undeclared-pools";
+  protected static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true;
+  
   /** Whether to use the user name as the queue name (instead of "default") if
    * the request does not specify a queue. */
   protected static final String  USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
@@ -66,6 +76,22 @@ public class FairSchedulerConfiguration 
   protected static final float  DEFAULT_LOCALITY_THRESHOLD_RACK =
 		  DEFAULT_LOCALITY_THRESHOLD;
 
+  /** Delay for node locality. */
+  protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX + "locality-delay-node-ms";
+  protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;
+
+  /** Delay for rack locality. */
+  protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX + "locality-delay-rack-ms";
+  protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;
+
+  /** Enable continuous scheduling or not. */
+  protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled";
+  protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
+
+  /** Sleep time of each pass in continuous scheduling (5ms in default) */
+  protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms";
+  protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
+
   /** Whether preemption is enabled. */
   protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
   protected static final boolean DEFAULT_PREEMPTION = false;
@@ -87,9 +113,12 @@ public class FairSchedulerConfiguration 
   protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
   protected static final int DEFAULT_MAX_ASSIGN = -1;
 
+  public FairSchedulerConfiguration() {
+    super();
+  }
+  
   public FairSchedulerConfiguration(Configuration conf) {
     super(conf);
-    addResource(FS_CONFIGURATION_FILE);
   }
 
   public Resource getMinimumAllocation() {
@@ -121,6 +150,10 @@ public class FairSchedulerConfiguration 
       DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
     return Resources.createResource(incrementMemory, incrementCores);
   }
+  
+  public boolean getAllowUndeclaredPools() {
+    return getBoolean(ALLOW_UNDECLARED_POOLS, DEFAULT_ALLOW_UNDECLARED_POOLS);
+  }
 
   public boolean getUserAsDefaultQueue() {
     return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
@@ -134,6 +167,22 @@ public class FairSchedulerConfiguration 
     return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
   }
 
+  public boolean isContinuousSchedulingEnabled() {
+    return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
+  }
+
+  public int getContinuousSchedulingSleepMs() {
+    return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
+  }
+
+  public long getLocalityDelayNodeMs() {
+    return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
+  }
+
+  public long getLocalityDelayRackMs() {
+    return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
+  }
+
   public boolean getPreemptionEnabled() {
     return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
   }
@@ -150,8 +199,28 @@ public class FairSchedulerConfiguration 
     return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
   }
 
-  public String getAllocationFile() {
-    return get(ALLOCATION_FILE);
+  /**
+   * Path to XML file containing allocations. If the
+   * path is relative, it is searched for in the
+   * classpath, but loaded like a regular File.
+   */
+  public File getAllocationFile() {
+    String allocFilePath = get(ALLOCATION_FILE, DEFAULT_ALLOCATION_FILE);
+    File allocFile = new File(allocFilePath);
+    if (!allocFile.isAbsolute()) {
+      URL url = Thread.currentThread().getContextClassLoader()
+          .getResource(allocFilePath);
+      if (url == null) {
+        LOG.warn(allocFilePath + " not found on the classpath.");
+        allocFile = null;
+      } else if (!url.getProtocol().equalsIgnoreCase("file")) {
+        throw new RuntimeException("Allocation file " + url
+            + " found on the classpath is not on the local filesystem.");
+      } else {
+        allocFile = new File(url.getPath());
+      }
+    }
+    return allocFile;
   }
 
   public String getEventlogDir() {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Wed Oct 30 22:21:59 2013
@@ -72,14 +72,14 @@ public class QueueManager {
    * (this is done to prevent loading a file that hasn't been fully written).
    */
   public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
+  
+  private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
+  private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
 
   private final FairScheduler scheduler;
 
-  private Object allocFile; // Path to XML file containing allocations. This
-                            // is either a URL to specify a classpath resource
-                            // (if the fair-scheduler.xml on the classpath is
-                            // used) or a String to specify an absolute path (if
-                            // mapred.fairscheduler.allocation.file is used).
+  // Path to XML file containing allocations. 
+  private File allocFile; 
 
   private final Collection<FSLeafQueue> leafQueues = 
       new CopyOnWriteArrayList<FSLeafQueue>();
@@ -107,40 +107,32 @@ public class QueueManager {
     queues.put(rootQueue.getName(), rootQueue);
     
     this.allocFile = conf.getAllocationFile();
-    if (allocFile == null) {
-      // No allocation file specified in jobconf. Use the default allocation
-      // file, fair-scheduler.xml, looking for it on the classpath.
-      allocFile = new Configuration().getResource("fair-scheduler.xml");
-      if (allocFile == null) {
-        LOG.error("The fair scheduler allocation file fair-scheduler.xml was "
-            + "not found on the classpath, and no other config file is given "
-            + "through mapred.fairscheduler.allocation.file.");
-      }
-    }
+    
     reloadAllocs();
     lastSuccessfulReload = scheduler.getClock().getTime();
     lastReloadAttempt = scheduler.getClock().getTime();
     // Create the default queue
-    getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
   }
   
   /**
-   * Get a queue by name, creating it if necessary.  If the queue
-   * is not or can not be a leaf queue, i.e. it already exists as a parent queue,
-   * or one of the parents in its name is already a leaf queue, null is returned.
+   * Get a queue by name, creating it if the create param is true and is necessary.
+   * If the queue is not or can not be a leaf queue, i.e. it already exists as a
+   * parent queue, or one of the parents in its name is already a leaf queue,
+   * null is returned.
    * 
    * The root part of the name is optional, so a queue underneath the root 
    * named "queue1" could be referred to  as just "queue1", and a queue named
    * "queue2" underneath a parent named "parent1" that is underneath the root 
    * could be referred to as just "parent1.queue2".
    */
-  public FSLeafQueue getLeafQueue(String name) {
+  public FSLeafQueue getLeafQueue(String name, boolean create) {
     if (!name.startsWith(ROOT_QUEUE + ".")) {
       name = ROOT_QUEUE + "." + name;
     }
     synchronized (queues) {
       FSQueue queue = queues.get(name);
-      if (queue == null) {
+      if (queue == null && create) {
         FSLeafQueue leafQueue = createLeafQueue(name);
         if (leafQueue == null) {
           return null;
@@ -236,13 +228,6 @@ public class QueueManager {
   }
 
   /**
-   * Get the queue for a given AppSchedulable.
-   */
-  public FSLeafQueue getQueueForApp(AppSchedulable app) {
-    return getLeafQueue(app.getApp().getQueueName());
-  }
-
-  /**
    * Reload allocations file if it hasn't been loaded in a while
    */
   public void reloadAllocsIfNecessary() {
@@ -255,14 +240,7 @@ public class QueueManager {
       try {
         // Get last modified time of alloc file depending whether it's a String
         // (for a path name) or an URL (for a classloader resource)
-        long lastModified;
-        if (allocFile instanceof String) {
-          File file = new File((String) allocFile);
-          lastModified = file.lastModified();
-        } else { // allocFile is an URL
-          URLConnection conn = ((URL) allocFile).openConnection();
-          lastModified = conn.getLastModified();
-        }
+        long lastModified = allocFile.lastModified();
         if (lastModified > lastSuccessfulReload &&
             time > lastModified + ALLOC_RELOAD_WAIT) {
           reloadAllocs();
@@ -321,66 +299,76 @@ public class QueueManager {
       DocumentBuilderFactory.newInstance();
     docBuilderFactory.setIgnoringComments(true);
     DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-    Document doc;
-    if (allocFile instanceof String) {
-      doc = builder.parse(new File((String) allocFile));
-    } else {
-      doc = builder.parse(allocFile.toString());
-    }
+    Document doc = builder.parse(allocFile);
     Element root = doc.getDocumentElement();
     if (!"allocations".equals(root.getTagName()))
       throw new AllocationConfigurationException("Bad fair scheduler config " +
           "file: top-level element not <allocations>");
     NodeList elements = root.getChildNodes();
+    List<Element> queueElements = new ArrayList<Element>();
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
-      if (!(node instanceof Element))
-        continue;
-      Element element = (Element)node;
-      if ("queue".equals(element.getTagName()) ||
-    	  "pool".equals(element.getTagName())) {
-        loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
-            userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
-            queueAcls, queueNamesInAllocFile);
-      } else if ("user".equals(element.getTagName())) {
-        String userName = element.getAttribute("name");
-        NodeList fields = element.getChildNodes();
-        for (int j = 0; j < fields.getLength(); j++) {
-          Node fieldNode = fields.item(j);
-          if (!(fieldNode instanceof Element))
-            continue;
-          Element field = (Element) fieldNode;
-          if ("maxRunningApps".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            int val = Integer.parseInt(text);
-            userMaxApps.put(userName, val);
+      if (node instanceof Element) {
+        Element element = (Element)node;
+        if ("queue".equals(element.getTagName()) ||
+      	  "pool".equals(element.getTagName())) {
+          queueElements.add(element);
+        } else if ("user".equals(element.getTagName())) {
+          String userName = element.getAttribute("name");
+          NodeList fields = element.getChildNodes();
+          for (int j = 0; j < fields.getLength(); j++) {
+            Node fieldNode = fields.item(j);
+            if (!(fieldNode instanceof Element))
+              continue;
+            Element field = (Element) fieldNode;
+            if ("maxRunningApps".equals(field.getTagName())) {
+              String text = ((Text)field.getFirstChild()).getData().trim();
+              int val = Integer.parseInt(text);
+              userMaxApps.put(userName, val);
+            }
           }
+        } else if ("userMaxAppsDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          int val = Integer.parseInt(text);
+          userMaxAppsDefault = val;
+        } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          long val = Long.parseLong(text) * 1000L;
+          fairSharePreemptionTimeout = val;
+        } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          long val = Long.parseLong(text) * 1000L;
+          defaultMinSharePreemptionTimeout = val;
+        } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          int val = Integer.parseInt(text);
+          queueMaxAppsDefault = val;
+        } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
+            || "defaultQueueSchedulingMode".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          SchedulingPolicy.setDefault(text);
+          defaultSchedPolicy = SchedulingPolicy.getDefault();
+        } else {
+          LOG.warn("Bad element in allocations file: " + element.getTagName());
         }
-      } else if ("userMaxAppsDefault".equals(element.getTagName())) {
-        String text = ((Text)element.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        userMaxAppsDefault = val;
-      } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
-        String text = ((Text)element.getFirstChild()).getData().trim();
-        long val = Long.parseLong(text) * 1000L;
-        fairSharePreemptionTimeout = val;
-      } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
-        String text = ((Text)element.getFirstChild()).getData().trim();
-        long val = Long.parseLong(text) * 1000L;
-        defaultMinSharePreemptionTimeout = val;
-      } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
-        String text = ((Text)element.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        queueMaxAppsDefault = val;
-      } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
-          || "defaultQueueSchedulingMode".equals(element.getTagName())) {
-        String text = ((Text)element.getFirstChild()).getData().trim();
-        SchedulingPolicy.setDefault(text);
-        defaultSchedPolicy = SchedulingPolicy.getDefault();
-      } else {
-        LOG.warn("Bad element in allocations file: " + element.getTagName());
       }
     }
+    
+    // Load queue elements.  A root queue can either be included or omitted.  If
+    // it's included, all other queues must be inside it.
+    for (Element element : queueElements) {
+      String parent = "root";
+      if (element.getAttribute("name").equalsIgnoreCase("root")) {
+        if (queueElements.size() > 1) {
+          throw new AllocationConfigurationException("If configuring root queue,"
+          		+ " no other queues can be placed alongside it.");
+        }
+        parent = null;
+      }
+      loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
+          userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
+          queueAcls, queueNamesInAllocFile);
+    }
 
     // Commit the reload; also create any queue defined in the alloc file
     // if it does not already exist, so it can be displayed on the web UI.
@@ -390,31 +378,24 @@ public class QueueManager {
           queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
       
-      // Update metrics
+      // Make sure all queues exist
+      for (String name: queueNamesInAllocFile) {
+        getLeafQueue(name, true);
+      }
+      
       for (FSQueue queue : queues.values()) {
+        // Update queue metrics
         FSQueueMetrics queueMetrics = queue.getMetrics();
         queueMetrics.setMinShare(queue.getMinShare());
         queueMetrics.setMaxShare(queue.getMaxShare());
+        // Set scheduling policies
+        if (queuePolicies.containsKey(queue.getName())) {
+          queue.setPolicy(queuePolicies.get(queue.getName()));
+        } else {
+          queue.setPolicy(SchedulingPolicy.getDefault());
+        }
       }
-      
-      // Root queue should have empty ACLs.  As a queue's ACL is the union of
-      // its ACL and all its parents' ACLs, setting the roots' to empty will
-      // neither allow nor prohibit more access to its children.
-      Map<QueueACL, AccessControlList> rootAcls =
-          new HashMap<QueueACL, AccessControlList>();
-      rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" "));
-      rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" "));
-      queueAcls.put(ROOT_QUEUE, rootAcls);
  
-      // Create all queus
-      for (String name: queueNamesInAllocFile) {
-        getLeafQueue(name);
-      }
-      
-      // Set custom policies as specified
-      for (Map.Entry<String, SchedulingPolicy> entry : queuePolicies.entrySet()) {
-        queues.get(entry.getKey()).setPolicy(entry.getValue());
-      }
     }
   }
   
@@ -428,7 +409,10 @@ public class QueueManager {
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
       throws AllocationConfigurationException {
-    String queueName = parentName + "." + element.getAttribute("name");
+    String queueName = element.getAttribute("name");
+    if (parentName != null) {
+      queueName = parentName + "." + queueName;
+    }
     Map<QueueACL, AccessControlList> acls =
         new HashMap<QueueACL, AccessControlList>();
     NodeList fields = element.getChildNodes();
@@ -466,10 +450,10 @@ public class QueueManager {
         policy.initialize(scheduler.getClusterCapacity());
         queuePolicies.put(queueName, policy);
       } else if ("aclSubmitApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
+        String text = ((Text)field.getFirstChild()).getData();
         acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
       } else if ("aclAdministerApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
+        String text = ((Text)field.getFirstChild()).getData();
         acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
       } else if ("queue".endsWith(field.getTagName()) || 
           "pool".equals(field.getTagName())) {
@@ -589,21 +573,16 @@ public class QueueManager {
 
   /**
    * Get the ACLs associated with this queue. If a given ACL is not explicitly
-   * configured, include the default value for that ACL.
-   */
-  public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
-    HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
-    Map<QueueACL, AccessControlList> queueAcl = info.queueAcls.get(queue);
-    if (queueAcl != null) {
-      out.putAll(queueAcl);
-    }
-    if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
-      out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
-    }
-    if (!out.containsKey(QueueACL.SUBMIT_APPLICATIONS)) {
-      out.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList("*"));
+   * configured, include the default value for that ACL.  The default for the
+   * root queue is everybody ("*") and the default for all other queues is
+   * nobody ("")
+   */
+  public AccessControlList getQueueAcl(String queue, QueueACL operation) {
+    Map<QueueACL, AccessControlList> queueAcls = info.queueAcls.get(queue);
+    if (queueAcls == null || !queueAcls.containsKey(operation)) {
+      return (queue.equals(ROOT_QUEUE)) ? EVERYBODY_ACL : NOBODY_ACL;
     }
-    return out;
+    return queueAcls.get(operation);
   }
   
   static class QueueManagerInfo {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Wed Oct 30 22:21:59 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Iterator;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -73,6 +74,10 @@ public class FifoPolicy extends Scheduli
   @Override
   public void computeShares(Collection<? extends Schedulable> schedulables,
       Resource totalResources) {
+    if (schedulables.isEmpty()) {
+      return;
+    }
+
     Schedulable earliest = null;
     for (Schedulable schedulable : schedulables) {
       if (earliest == null ||

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Oct 30 22:21:59 2013
@@ -25,8 +25,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,12 +36,10 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -50,7 +48,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@@ -65,18 +65,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@@ -90,6 +81,8 @@ import org.apache.hadoop.yarn.util.resou
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -113,8 +106,10 @@ public class FifoScheduler implements Re
   private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
-  private Map<ApplicationAttemptId, FiCaSchedulerApp> applications
-      = new TreeMap<ApplicationAttemptId, FiCaSchedulerApp>();
+  // Use ConcurrentSkipListMap because applications need to be ordered
+  @VisibleForTesting
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+      = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
   private ActiveUsersManager activeUsersManager;
 
@@ -152,7 +147,6 @@ public class FifoScheduler implements Re
       return queueInfo;
     }
 
-    @Override
     public Map<QueueACL, AccessControlList> getQueueAcls() {
       Map<QueueACL, AccessControlList> acls =
         new HashMap<QueueACL, AccessControlList>();
@@ -171,6 +165,11 @@ public class FifoScheduler implements Re
       queueUserAclInfo.setUserAcls(Arrays.asList(QueueACL.values()));
       return Collections.singletonList(queueUserAclInfo);
     }
+
+    @Override
+    public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
+      return getQueueAcls().get(acl).isUserAllowed(user);
+    }
   };
 
   @Override
@@ -295,7 +294,7 @@ public class FifoScheduler implements Re
         application.showRequests();
 
         // Update application requests
-        application.updateResourceRequests(ask, blacklistAdditions, blacklistRemovals);
+        application.updateResourceRequests(ask);
 
         LOG.debug("allocate: post-update" +
             " applicationId=" + applicationAttemptId + 
@@ -307,13 +306,16 @@ public class FifoScheduler implements Re
             " #ask=" + ask.size());
       }
 
+      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+
       return new Allocation(
           application.pullNewlyAllocatedContainers(), 
           application.getHeadroom());
     }
   }
 
-  private FiCaSchedulerApp getApplication(
+  @VisibleForTesting
+  FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
     return applications.get(applicationAttemptId);
   }
@@ -394,7 +396,7 @@ public class FifoScheduler implements Re
       application.showRequests();
       synchronized (application) {
         // Check if this resource is on the blacklist
-        if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
+        if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
           continue;
         }
         
@@ -832,4 +834,10 @@ public class FifoScheduler implements Re
     return DEFAULT_QUEUE.getMetrics();
   }
 
+  @Override
+  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+      QueueACL acl, String queueName) {
+    return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
+  }
+
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Wed Oct 30 22:21:59 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
@@ -30,6 +31,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -124,6 +126,19 @@ public class AMRMTokenSecretManager exte
   }
 
   /**
+   * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
+   */
+  public synchronized void
+      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
+    AMRMTokenIdentifier identifier = token.decodeIdentifier();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+    }
+    this.passwords.put(identifier.getApplicationAttemptId(),
+      token.getPassword());
+  }
+
+  /**
    * Retrieve the password for the given {@link AMRMTokenIdentifier}.
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
    */

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ClientToAMTokenSecretManagerInRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ClientToAMTokenSecretManagerInRM.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ClientToAMTokenSecretManagerInRM.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ClientToAMTokenSecretManagerInRM.java Wed Oct 30 22:21:59 2013
@@ -33,17 +33,21 @@ public class ClientToAMTokenSecretManage
   private Map<ApplicationAttemptId, SecretKey> masterKeys =
       new HashMap<ApplicationAttemptId, SecretKey>();
 
-  public synchronized SecretKey registerApplication(
+  public synchronized SecretKey createMasterKey(
       ApplicationAttemptId applicationAttemptID) {
-    SecretKey key = generateSecret();
+    return generateSecret();
+  }
+
+  public synchronized void registerApplication(
+      ApplicationAttemptId applicationAttemptID, SecretKey key) {
     this.masterKeys.put(applicationAttemptID, key);
-    return key;
   }
 
+  // Only for RM recovery
   public synchronized SecretKey registerMasterKey(
       ApplicationAttemptId applicationAttemptID, byte[] keyData) {
     SecretKey key = createSecretKey(keyData);
-    this.masterKeys.put(applicationAttemptID, key);
+    registerApplication(applicationAttemptID, key);
     return key;
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Wed Oct 30 22:21:59 2013
@@ -34,6 +34,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,8 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -64,6 +67,7 @@ public class DelegationTokenRenewer exte
 
   // global single timer (daemon)
   private Timer renewalTimer;
+  private RMContext rmContext;
   
   // delegation token canceler thread
   private DelegationTokenCancelThread dtCancelThread =
@@ -80,6 +84,9 @@ public class DelegationTokenRenewer exte
   private long tokenRemovalDelayMs;
   
   private Thread delayedRemovalThread;
+  private boolean isServiceStarted = false;
+  private List<DelegationTokenToRenew> pendingTokenForRenewal =
+      new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
   
   private boolean tokenKeepAliveEnabled;
   
@@ -100,7 +107,6 @@ public class DelegationTokenRenewer exte
 
   @Override
   protected void serviceStart() throws Exception {
-    
     dtCancelThread.start();
     renewalTimer = new Timer(true);
     if (tokenKeepAliveEnabled) {
@@ -109,6 +115,15 @@ public class DelegationTokenRenewer exte
               "DelayedTokenCanceller");
       delayedRemovalThread.start();
     }
+    // enable RM to short-circuit token operations directly to itself
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(
+        rmContext.getRMDelegationTokenSecretManager(),
+        rmContext.getClientRMService().getBindAddress());
+    // Delegation token renewal is delayed until ClientRMService starts. As
+    // it is required to short circuit the token renewal calls.
+    isServiceStarted = true;
+    renewIfServiceIsStarted(pendingTokenForRenewal);
+    pendingTokenForRenewal.clear();
     super.serviceStart();
   }
 
@@ -275,8 +290,8 @@ public class DelegationTokenRenewer exte
    * @throws IOException
    */
   public void addApplication(
-      ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd) 
-  throws IOException {
+      ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
+      throws IOException {
     if (ts == null) {
       return; //nothing to add
     }
@@ -291,25 +306,40 @@ public class DelegationTokenRenewer exte
     
     // find tokens for renewal, but don't add timers until we know
     // all renewable tokens are valid
-    Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
+    // At RM restart it is safe to assume that all the previously added tokens
+    // are valid
+    List<DelegationTokenToRenew> tokenList =
+        new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
     for(Token<?> token : tokens) {
-      // first renew happens immediately
       if (token.isManaged()) {
-        DelegationTokenToRenew dtr = 
-          new DelegationTokenToRenew(applicationId, token, getConfig(), now, 
-              shouldCancelAtEnd); 
-        renewToken(dtr);
-        dtrs.add(dtr);
+        tokenList.add(new DelegationTokenToRenew(applicationId,
+            token, getConfig(), now, shouldCancelAtEnd));
       }
     }
-    for (DelegationTokenToRenew dtr : dtrs) {
-      addTokenToList(dtr);
-      setTimerForTokenRenewal(dtr);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Registering token for renewal for:" +
-            " service = " + dtr.token.getService() +
-            " for appId = " + applicationId);
+    if (!tokenList.isEmpty()){
+      renewIfServiceIsStarted(tokenList);
+    }
+  }
+
+  protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
+      throws IOException {
+    if (isServiceStarted) {
+      // Renewing token and adding it to timer calls are separated purposefully
+      // If user provides incorrect token then it should not be added for
+      // renewal.
+      for (DelegationTokenToRenew dtr : dtrs) {
+        renewToken(dtr);
+      }
+      for (DelegationTokenToRenew dtr : dtrs) {
+        addTokenToList(dtr);
+        setTimerForTokenRenewal(dtr);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Registering token for renewal for:" + " service = "
+              + dtr.token.getService() + " for appId = " + dtr.applicationId);
+        }
       }
+    } else {
+      pendingTokenForRenewal.addAll(dtrs);
     }
   }
   
@@ -513,4 +543,7 @@ public class DelegationTokenRenewer exte
     }
   }
   
+  public void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java Wed Oct 30 22:21:59 2013
@@ -185,7 +185,7 @@ public class RMContainerTokenSecretManag
       tokenIdentifier =
           new ContainerTokenIdentifier(containerId, nodeId.toString(),
             appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
-              .getMasterKey().getKeyId(), ResourceManager.clusterTimeStamp);
+              .getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp());
       password = this.createPassword(tokenIdentifier);
 
     } finally {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
@@ -52,6 +53,9 @@ public class RMPolicyProvider extends Po
     new Service(
         YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL, 
         ContainerManagementProtocolPB.class),
+    new Service(
+        YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL,
+        HAServiceProtocol.class),
   };
 
   @Override

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java Wed Oct 30 22:21:59 2013
@@ -25,22 +25,21 @@ import static org.apache.hadoop.yarn.web
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._ODD;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
 
-
 import java.util.Collection;
 
-import com.google.inject.Inject;
-
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.Times;
@@ -50,14 +49,19 @@ import org.apache.hadoop.yarn.webapp.ham
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 import org.apache.hadoop.yarn.webapp.view.InfoBlock;
 
+import com.google.inject.Inject;
+
 public class AppBlock extends HtmlBlock {
 
   private ApplicationACLsManager aclsManager;
+  private QueueACLsManager queueACLsManager;
 
   @Inject
-  AppBlock(ResourceManager rm, ViewContext ctx, ApplicationACLsManager aclsManager) {
+  AppBlock(ResourceManager rm, ViewContext ctx,
+      ApplicationACLsManager aclsManager, QueueACLsManager queueACLsManager) {
     super(ctx);
     this.aclsManager = aclsManager;
+    this.queueACLsManager = queueACLsManager;
   }
 
   @Override
@@ -91,8 +95,10 @@ public class AppBlock extends HtmlBlock 
       callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
     }
     if (callerUGI != null
-        && !this.aclsManager.checkAccess(callerUGI,
-            ApplicationAccessType.VIEW_APP, app.getUser(), appID)) {
+        && !(this.aclsManager.checkAccess(callerUGI,
+                ApplicationAccessType.VIEW_APP, app.getUser(), appID) ||
+             this.queueACLsManager.checkAccess(callerUGI,
+                QueueACL.ADMINISTER_QUEUE, app.getQueue()))) {
       puts("You (User " + remoteUser
           + ") are not authorized to view application " + appID);
       return;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Wed Oct 30 22:21:59 2013
@@ -98,24 +98,25 @@ class CapacitySchedulerPage extends RmVi
       for (UserInfo entry: users) {
         activeUserList.append(entry.getUsername()).append(" &lt;")
           .append(getPercentage(entry.getResourcesUsed(), usedResources))
-          .append(", Active Apps: " + entry.getNumActiveApplications())
-          .append(", Pending Apps: " + entry.getNumPendingApplications())
+          .append(", Schedulable Apps: " + entry.getNumActiveApplications())
+          .append(", Non-Schedulable Apps: " + entry.getNumPendingApplications())
           .append("&gt;<br style='display:block'>"); //Force line break
       }
 
       ResponseInfo ri = info("\'" + lqinfo.getQueuePath().substring(5) + "\' Queue Status").
           _("Queue State:", lqinfo.getQueueState()).
           _("Used Capacity:", percent(lqinfo.getUsedCapacity() / 100)).
+          _("Absolute Used Capacity:", percent(lqinfo.getAbsoluteUsedCapacity() / 100)).
           _("Absolute Capacity:", percent(lqinfo.getAbsoluteCapacity() / 100)).
           _("Absolute Max Capacity:", percent(lqinfo.getAbsoluteMaxCapacity() / 100)).
-          _("Used Resources:", StringEscapeUtils.escapeHtml(lqinfo.getUsedResources().toString())).
-          _("Num Active Applications:", Integer.toString(lqinfo.getNumActiveApplications())).
-          _("Num Pending Applications:", Integer.toString(lqinfo.getNumPendingApplications())).
+          _("Used Resources:", StringEscapeUtils.escapeHtml(lqinfo.getResourcesUsed().toString())).
+          _("Num Schedulable Applications:", Integer.toString(lqinfo.getNumActiveApplications())).
+          _("Num Non-Schedulable Applications:", Integer.toString(lqinfo.getNumPendingApplications())).
           _("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
           _("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
           _("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
-          _("Max Active Applications:", Integer.toString(lqinfo.getMaxActiveApplications())).
-          _("Max Active Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())).
+          _("Max Schedulable Applications:", Integer.toString(lqinfo.getMaxActiveApplications())).
+          _("Max Schedulable Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())).
           _("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
           _("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
           _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Wed Oct 30 22:21:59 2013
@@ -22,6 +22,7 @@ import static org.apache.hadoop.yarn.uti
 
 import java.util.Collection;
 
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -64,11 +65,14 @@ public class FairSchedulerPage extends R
     @Override
     protected void render(Block html) {
       ResponseInfo ri = info("\'" + qinfo.getQueueName() + "\' Queue Status").
-          _("Used Resources:", qinfo.getUsedResources().toString()).
+          _("Used Resources:", StringEscapeUtils.escapeHtml(
+              qinfo.getUsedResources().toString())).
           _("Num Active Applications:", qinfo.getNumActiveApplications()).
           _("Num Pending Applications:", qinfo.getNumPendingApplications()).
-          _("Min Resources:", qinfo.getMinResources().toString()).
-          _("Max Resources:", qinfo.getMaxResources().toString());
+          _("Min Resources:", StringEscapeUtils.escapeHtml(
+              qinfo.getMinResources().toString())).
+          _("Max Resources:", StringEscapeUtils.escapeHtml(
+              qinfo.getMaxResources().toString()));
       int maxApps = qinfo.getMaxApplications();
       if (maxApps < Integer.MAX_VALUE) {
           ri._("Max Running Applications:", qinfo.getMaxApplications());

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java Wed Oct 30 22:21:59 2013
@@ -34,12 +34,14 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
@@ -65,7 +67,8 @@ public class JAXBContextResolver impleme
       CapacitySchedulerInfo.class, ClusterMetricsInfo.class,
       SchedulerInfo.class, AppsInfo.class, NodesInfo.class,
       RemoteExceptionData.class, CapacitySchedulerQueueInfoList.class,
-      ResourceInfo.class, UsersInfo.class, UserInfo.class};
+      ResourceInfo.class, UsersInfo.class, UserInfo.class,
+      ApplicationStatisticsInfo.class, StatisticsItemInfo.class};
 
   public JAXBContextResolver() throws Exception {
     this.types = new HashSet<Class>(Arrays.asList(cTypes));

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Wed Oct 30 22:21:59 2013
@@ -76,6 +76,7 @@ class NodesPage extends RmView {
           th(".containers", "Containers").
           th(".mem", "Mem Used").
           th(".mem", "Mem Avail").
+          th(".nodeManagerVersion", "Version").
           _()._().
           tbody();
       NodeState stateFilter = null;
@@ -129,6 +130,7 @@ class NodesPage extends RmView {
               _(StringUtils.byteDesc(usedMemory * BYTES_IN_MB))._().
             td().br().$title(String.valueOf(usedMemory))._().
               _(StringUtils.byteDesc(availableMemory * BYTES_IN_MB))._().
+            td(ni.getNodeManagerVersion()).
             _();
       }
       tbody._()._();

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java Wed Oct 30 22:21:59 2013
@@ -22,6 +22,7 @@ import static org.apache.hadoop.yarn.uti
 
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.WebApp;
@@ -48,6 +49,7 @@ public class RMWebApp extends WebApp imp
       bind(RMContext.class).toInstance(rm.getRMContext());
       bind(ApplicationACLsManager.class).toInstance(
           rm.getApplicationACLsManager());
+      bind(QueueACLsManager.class).toInstance(rm.getQueueACLsManager());
     }
     route("/", RmController.class);
     route(pajoin("/nodes", NODE_STATE), RmController.class, "nodes");

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Wed Oct 30 22:21:59 2013
@@ -19,9 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
@@ -41,12 +44,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -54,9 +58,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
@@ -67,6 +73,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
@@ -79,18 +86,22 @@ import com.google.inject.Singleton;
 @Path("/ws/v1/cluster")
 public class RMWebServices {
   private static final String EMPTY = "";
+  private static final String ANY = "*";
   private final ResourceManager rm;
   private static RecordFactory recordFactory = RecordFactoryProvider
       .getRecordFactory(null);
   private final ApplicationACLsManager aclsManager;
+  private final QueueACLsManager queueACLsManager;
 
   private @Context HttpServletResponse response;
 
   @Inject
   public RMWebServices(final ResourceManager rm,
-      final ApplicationACLsManager aclsManager) {
+      final ApplicationACLsManager aclsManager,
+      final QueueACLsManager queueACLsManager) {
     this.rm = rm;
     this.aclsManager = aclsManager;
+    this.queueACLsManager = queueACLsManager;
   }
 
   protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) {
@@ -101,9 +112,10 @@ public class RMWebServices {
       callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
     }
     if (callerUGI != null
-        && !this.aclsManager.checkAccess(callerUGI,
+        && !(this.aclsManager.checkAccess(callerUGI,
             ApplicationAccessType.VIEW_APP, app.getUser(),
-            app.getApplicationId())) {
+            app.getApplicationId()) || this.queueACLsManager.checkAccess(
+            callerUGI, QueueACL.ADMINISTER_QUEUE, app.getQueue()))) {
       return false;
     }
     return true;
@@ -231,6 +243,7 @@ public class RMWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public AppsInfo getApps(@Context HttpServletRequest hsr,
       @QueryParam("state") String stateQuery,
+      @QueryParam("states") Set<String> statesQuery,
       @QueryParam("finalStatus") String finalStatusQuery,
       @QueryParam("user") String userQuery,
       @QueryParam("queue") String queueQuery,
@@ -245,6 +258,7 @@ public class RMWebServices {
     boolean checkStart = false;
     boolean checkEnd = false;
     boolean checkAppTypes = false;
+    boolean checkAppStates = false;
     long countNum = 0;
 
     // set values suitable in case both of begin/end not specified
@@ -300,27 +314,20 @@ public class RMWebServices {
           "finishTimeEnd must be greater than finishTimeBegin");
     }
 
-    Set<String> appTypes = new HashSet<String>();
-    if (!applicationTypes.isEmpty()) {
-      for (String applicationType : applicationTypes) {
-        if (applicationType != null && !applicationType.trim().isEmpty()) {
-          if (applicationType.indexOf(",") == -1) {
-            appTypes.add(applicationType.trim());
-          } else {
-            String[] types = applicationType.split(",");
-            for (String type : types) {
-              if (!type.trim().isEmpty()) {
-                appTypes.add(type.trim());
-              }
-            }
-          }
-        }
-      }
-    }
+    Set<String> appTypes = parseQueries(applicationTypes, false);
     if (!appTypes.isEmpty()) {
       checkAppTypes = true;
     }
 
+    // stateQuery is deprecated.
+    if (stateQuery != null && !stateQuery.isEmpty()) {
+      statesQuery.add(stateQuery);
+    }
+    Set<String> appStates = parseQueries(statesQuery, true);
+    if (!appStates.isEmpty()) {
+      checkAppStates = true;
+    }
+
     final ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext()
         .getRMApps();
     AppsInfo allApps = new AppsInfo();
@@ -329,11 +336,10 @@ public class RMWebServices {
       if (checkCount && num == countNum) {
         break;
       }
-      if (stateQuery != null && !stateQuery.isEmpty()) {
-        RMAppState.valueOf(stateQuery);
-        if (!rmapp.getState().toString().equalsIgnoreCase(stateQuery)) {
-          continue;
-        }
+
+      if (checkAppStates && !appStates.contains(
+          rmapp.createApplicationState().toString().toLowerCase())) {
+        continue;
       }
       if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) {
         FinalApplicationStatus.valueOf(finalStatusQuery);
@@ -362,8 +368,8 @@ public class RMWebServices {
           continue;
         }
       }
-      if (checkAppTypes
-          && !appTypes.contains(rmapp.getApplicationType())) {
+      if (checkAppTypes && !appTypes.contains(
+          rmapp.getApplicationType().trim().toLowerCase())) {
         continue;
       }
 
@@ -384,6 +390,122 @@ public class RMWebServices {
   }
 
   @GET
+  @Path("/appstatistics")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public ApplicationStatisticsInfo getAppStatistics(
+      @Context HttpServletRequest hsr,
+      @QueryParam("states") Set<String> stateQueries,
+      @QueryParam("applicationTypes") Set<String> typeQueries) {
+    init();
+
+    // parse the params and build the scoreboard
+    // converting state/type name to lowercase
+    Set<String> states = parseQueries(stateQueries, true);
+    Set<String> types = parseQueries(typeQueries, false);
+    // if no types, counts the applications of any types
+    if (types.size() == 0) {
+      types.add(ANY);
+    } else if (types.size() != 1) {
+      throw new BadRequestException("# of applicationTypes = " + types.size()
+          + ", we temporarily support at most one applicationType");
+    }
+    // if no states, returns the counts of all RMAppStates
+    if (states.size() == 0) {
+      for (YarnApplicationState state : YarnApplicationState.values()) {
+        states.add(state.toString().toLowerCase());
+      }
+    }
+    // in case we extend to multiple applicationTypes in the future
+    Map<YarnApplicationState, Map<String, Long>> scoreboard =
+        buildScoreboard(states, types);
+
+    // go through the apps in RM to count the numbers, ignoring the case of
+    // the state/type name
+    ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext().getRMApps();
+    for (RMApp rmapp : apps.values()) {
+      YarnApplicationState state = rmapp.createApplicationState();
+      String type = rmapp.getApplicationType().trim().toLowerCase();
+      if (states.contains(state.toString().toLowerCase())) {
+        if (types.contains(ANY)) {
+          countApp(scoreboard, state, ANY);
+        } else if (types.contains(type)) {
+          countApp(scoreboard, state, type);
+        }
+      }
+    }
+
+    // fill the response object
+    ApplicationStatisticsInfo appStatInfo = new ApplicationStatisticsInfo();
+    for (Map.Entry<YarnApplicationState, Map<String, Long>> partScoreboard
+        : scoreboard.entrySet()) {
+      for (Map.Entry<String, Long> statEntry
+          : partScoreboard.getValue().entrySet()) {
+        StatisticsItemInfo statItem = new StatisticsItemInfo(
+            partScoreboard.getKey(), statEntry.getKey(), statEntry.getValue());
+        appStatInfo.add(statItem);
+      }
+    }
+    return appStatInfo;
+  }
+
+  private static Set<String> parseQueries(
+      Set<String> queries, boolean isState) {
+    Set<String> params = new HashSet<String>();
+    if (!queries.isEmpty()) {
+      for (String query : queries) {
+        if (query != null && !query.trim().isEmpty()) {
+          String[] paramStrs = query.split(",");
+          for (String paramStr : paramStrs) {
+            if (paramStr != null && !paramStr.trim().isEmpty()) {
+              if (isState) {
+                try {
+                  // enum string is in the uppercase
+                  YarnApplicationState.valueOf(paramStr.trim().toUpperCase());
+                } catch (RuntimeException e) {
+                  YarnApplicationState[] stateArray =
+                      YarnApplicationState.values();
+                  String allAppStates = Arrays.toString(stateArray);
+                  throw new BadRequestException(
+                      "Invalid application-state " + paramStr.trim()
+                      + " specified. It should be one of " + allAppStates);
+                }
+              }
+              params.add(paramStr.trim().toLowerCase());
+            }
+          }
+        }
+      }
+    }
+    return params;
+  }
+
+  private static Map<YarnApplicationState, Map<String, Long>> buildScoreboard(
+     Set<String> states, Set<String> types) {
+    Map<YarnApplicationState, Map<String, Long>> scoreboard
+        = new HashMap<YarnApplicationState, Map<String, Long>>();
+    // default states will result in enumerating all YarnApplicationStates
+    assert !states.isEmpty();
+    for (String state : states) {
+      Map<String, Long> partScoreboard = new HashMap<String, Long>();
+      scoreboard.put(
+          YarnApplicationState.valueOf(state.toUpperCase()), partScoreboard);
+      // types is verified no to be empty
+      for (String type : types) {
+        partScoreboard.put(type, 0L);
+      }
+    }
+    return scoreboard;
+  }
+
+  private static void countApp(
+      Map<YarnApplicationState, Map<String, Long>> scoreboard,
+      YarnApplicationState state, String type) {
+    Map<String, Long> partScoreboard = scoreboard.get(state);
+    Long count = partScoreboard.get(type);
+    partScoreboard.put(type, count + 1L);
+  }
+
+  @GET
   @Path("/apps/{appid}")
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public AppInfo getApp(@Context HttpServletRequest hsr,



Mime
View raw message