hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1548386 [2/3] - in /hadoop/common/branches/HDFS-4685/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop...
Date Fri, 06 Dec 2013 06:57:21 GMT
Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Dec  6 06:57:15 2013
@@ -18,20 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URL;
-import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.commons.logging.Log;
@@ -39,21 +33,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Maintains a list of queues as well as scheduling parameters for each queue,
  * such as guaranteed share allocations, from the fair scheduler config file.
@@ -67,37 +49,13 @@ public class QueueManager {
 
   public static final String ROOT_QUEUE = "root";
   
-  /** Time to wait between checks of the allocation file */
-  public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
-
-  /**
-   * Time to wait after the allocation has been modified before reloading it
-   * (this is done to prevent loading a file that hasn't been fully written).
-   */
-  public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
-  
-  private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
-  private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
-
   private final FairScheduler scheduler;
 
-  // Path to XML file containing allocations. 
-  private File allocFile; 
-
   private final Collection<FSLeafQueue> leafQueues = 
       new CopyOnWriteArrayList<FSLeafQueue>();
   private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
   private FSParentQueue rootQueue;
 
-  @VisibleForTesting
-  volatile QueueManagerInfo info = new QueueManagerInfo();
-  @VisibleForTesting
-  volatile QueuePlacementPolicy placementPolicy;
-  
-  private long lastReloadAttempt; // Last time we tried to reload the queues file
-  private long lastSuccessfulReload; // Last time we successfully reloaded queues
-  private boolean lastReloadAttemptFailed = false;
-  
   public QueueManager(FairScheduler scheduler) {
     this.scheduler = scheduler;
   }
@@ -106,45 +64,15 @@ public class QueueManager {
     return rootQueue;
   }
 
-  public void initialize() throws IOException, SAXException,
-      AllocationConfigurationException, ParserConfigurationException {
-    FairSchedulerConfiguration conf = scheduler.getConf();
-    rootQueue = new FSParentQueue("root", this, scheduler, null);
+  public void initialize(Configuration conf) throws IOException,
+      SAXException, AllocationConfigurationException, ParserConfigurationException {
+    rootQueue = new FSParentQueue("root", scheduler, null);
     queues.put(rootQueue.getName(), rootQueue);
     
-    this.allocFile = conf.getAllocationFile();
-    placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
-        new HashSet<String>(), conf);
-    
-    reloadAllocs();
-    lastSuccessfulReload = scheduler.getClock().getTime();
-    lastReloadAttempt = scheduler.getClock().getTime();
     // Create the default queue
     getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
   }
   
-  public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
-    
-  }
-  
-  /**
-   * Construct simple queue placement policy from allow-undeclared-pools and
-   * user-as-default-queue.
-   */
-  private List<QueuePlacementRule> getSimplePlacementRules() {
-    boolean create = scheduler.getConf().getAllowUndeclaredPools();
-    boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
-    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
-    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
-    if (userAsDefaultQueue) {
-      rules.add(new QueuePlacementRule.User().initialize(create, null));
-    }
-    if (!userAsDefaultQueue || !create) {
-      rules.add(new QueuePlacementRule.Default().initialize(true, null));
-    }
-    return rules;
-  }
-  
   /**
    * Get a queue by name, creating it if the create param is true and is necessary.
    * If the queue is not or can not be a leaf queue, i.e. it already exists as a
@@ -213,17 +141,30 @@ public class QueueManager {
     // queue to create.
     // Now that we know everything worked out, make all the queues
     // and add them to the map.
+    AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
     FSLeafQueue leafQueue = null;
     for (int i = newQueueNames.size()-1; i >= 0; i--) {
       String queueName = newQueueNames.get(i);
       if (i == 0) {
         // First name added was the leaf queue
-        leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+        leafQueue = new FSLeafQueue(name, scheduler, parent);
+        try {
+          leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
+        } catch (AllocationConfigurationException ex) {
+          LOG.warn("Failed to set default scheduling policy "
+              + queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex);
+        }
         parent.addChildQueue(leafQueue);
         queues.put(leafQueue.getName(), leafQueue);
         leafQueues.add(leafQueue);
       } else {
-        FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+        FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
+        try {
+          newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
+        } catch (AllocationConfigurationException ex) {
+          LOG.warn("Failed to set default scheduling policy "
+              + queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex);
+        }
         parent.addChildQueue(newParent);
         queues.put(newParent.getName(), newParent);
         parent = newParent;
@@ -257,301 +198,6 @@ public class QueueManager {
     }
   }
   
-  public QueuePlacementPolicy getPlacementPolicy() {
-    return placementPolicy;
-  }
-
-  /**
-   * Reload allocations file if it hasn't been loaded in a while
-   */
-  public void reloadAllocsIfNecessary() {
-    long time = scheduler.getClock().getTime();
-    if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
-      lastReloadAttempt = time;
-      if (null == allocFile) {
-        return;
-      }
-      try {
-        // Get last modified time of alloc file depending whether it's a String
-        // (for a path name) or an URL (for a classloader resource)
-        long lastModified = allocFile.lastModified();
-        if (lastModified > lastSuccessfulReload &&
-            time > lastModified + ALLOC_RELOAD_WAIT) {
-          reloadAllocs();
-          lastSuccessfulReload = time;
-          lastReloadAttemptFailed = false;
-        }
-      } catch (Exception e) {
-        // Throwing the error further out here won't help - the RPC thread
-        // will catch it and report it in a loop. Instead, just log it and
-        // hope somebody will notice from the log.
-        // We log the error only on the first failure so we don't fill up the
-        // JobTracker's log with these messages.
-        if (!lastReloadAttemptFailed) {
-          LOG.error("Failed to reload fair scheduler config file - " +
-              "will use existing allocations.", e);
-        }
-        lastReloadAttemptFailed = true;
-      }
-    }
-  }
-
-  /**
-   * Updates the allocation list from the allocation config file. This file is
-   * expected to be in the XML format specified in the design doc.
-   *
-   * @throws IOException if the config file cannot be read.
-   * @throws AllocationConfigurationException if allocations are invalid.
-   * @throws ParserConfigurationException if XML parser is misconfigured.
-   * @throws SAXException if config file is malformed.
-   */
-  public void reloadAllocs() throws IOException, ParserConfigurationException,
-      SAXException, AllocationConfigurationException {
-    if (allocFile == null) return;
-    // Create some temporary hashmaps to hold the new allocs, and we only save
-    // them in our fields if we have parsed the entire allocs file successfully.
-    Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
-    Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
-    Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
-    Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
-    Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
-    Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
-    Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
-    Map<String, Map<QueueACL, AccessControlList>> queueAcls =
-        new HashMap<String, Map<QueueACL, AccessControlList>>();
-    int userMaxAppsDefault = Integer.MAX_VALUE;
-    int queueMaxAppsDefault = Integer.MAX_VALUE;
-    long fairSharePreemptionTimeout = Long.MAX_VALUE;
-    long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-    SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
-    
-    QueuePlacementPolicy newPlacementPolicy = null;
-
-    // Remember all queue names so we can display them on web UI, etc.
-    List<String> queueNamesInAllocFile = new ArrayList<String>();
-
-    // Read and parse the allocations file.
-    DocumentBuilderFactory docBuilderFactory =
-      DocumentBuilderFactory.newInstance();
-    docBuilderFactory.setIgnoringComments(true);
-    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-    Document doc = builder.parse(allocFile);
-    Element root = doc.getDocumentElement();
-    if (!"allocations".equals(root.getTagName()))
-      throw new AllocationConfigurationException("Bad fair scheduler config " +
-          "file: top-level element not <allocations>");
-    NodeList elements = root.getChildNodes();
-    List<Element> queueElements = new ArrayList<Element>();
-    Element placementPolicyElement = null;
-    for (int i = 0; i < elements.getLength(); i++) {
-      Node node = elements.item(i);
-      if (node instanceof Element) {
-        Element element = (Element)node;
-        if ("queue".equals(element.getTagName()) ||
-      	  "pool".equals(element.getTagName())) {
-          queueElements.add(element);
-        } else if ("user".equals(element.getTagName())) {
-          String userName = element.getAttribute("name");
-          NodeList fields = element.getChildNodes();
-          for (int j = 0; j < fields.getLength(); j++) {
-            Node fieldNode = fields.item(j);
-            if (!(fieldNode instanceof Element))
-              continue;
-            Element field = (Element) fieldNode;
-            if ("maxRunningApps".equals(field.getTagName())) {
-              String text = ((Text)field.getFirstChild()).getData().trim();
-              int val = Integer.parseInt(text);
-              userMaxApps.put(userName, val);
-            }
-          }
-        } else if ("userMaxAppsDefault".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          int val = Integer.parseInt(text);
-          userMaxAppsDefault = val;
-        } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          long val = Long.parseLong(text) * 1000L;
-          fairSharePreemptionTimeout = val;
-        } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          long val = Long.parseLong(text) * 1000L;
-          defaultMinSharePreemptionTimeout = val;
-        } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          int val = Integer.parseInt(text);
-          queueMaxAppsDefault = val;
-        } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
-            || "defaultQueueSchedulingMode".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          SchedulingPolicy.setDefault(text);
-          defaultSchedPolicy = SchedulingPolicy.getDefault();
-        } else if ("queuePlacementPolicy".equals(element.getTagName())) {
-          placementPolicyElement = element;
-        } else {
-          LOG.warn("Bad element in allocations file: " + element.getTagName());
-        }
-      }
-    }
-    
-    // Load queue elements.  A root queue can either be included or omitted.  If
-    // it's included, all other queues must be inside it.
-    for (Element element : queueElements) {
-      String parent = "root";
-      if (element.getAttribute("name").equalsIgnoreCase("root")) {
-        if (queueElements.size() > 1) {
-          throw new AllocationConfigurationException("If configuring root queue,"
-          		+ " no other queues can be placed alongside it.");
-        }
-        parent = null;
-      }
-      loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
-          userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
-          queueAcls, queueNamesInAllocFile);
-    }
-    
-    // Load placement policy and pass it configured queues
-    if (placementPolicyElement != null) {
-      newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
-          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
-    } else {
-      newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
-          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
-    }
-
-    // Commit the reload; also create any queue defined in the alloc file
-    // if it does not already exist, so it can be displayed on the web UI.
-    synchronized (this) {
-      info = new QueueManagerInfo(minQueueResources, maxQueueResources,
-          queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
-          queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
-          queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
-      placementPolicy = newPlacementPolicy;
-      
-      // Make sure all queues exist
-      for (String name: queueNamesInAllocFile) {
-        getLeafQueue(name, true);
-      }
-      
-      for (FSQueue queue : queues.values()) {
-        // Update queue metrics
-        FSQueueMetrics queueMetrics = queue.getMetrics();
-        queueMetrics.setMinShare(queue.getMinShare());
-        queueMetrics.setMaxShare(queue.getMaxShare());
-        // Set scheduling policies
-        if (queuePolicies.containsKey(queue.getName())) {
-          queue.setPolicy(queuePolicies.get(queue.getName()));
-        } else {
-          queue.setPolicy(SchedulingPolicy.getDefault());
-        }
-      }
- 
-    }
-  }
-  
-  /**
-   * Loads a queue from a queue element in the configuration file
-   */
-  private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
-      Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
-      Map<String, SchedulingPolicy> queuePolicies,
-      Map<String, Long> minSharePreemptionTimeouts,
-      Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
-      throws AllocationConfigurationException {
-    String queueName = element.getAttribute("name");
-    if (parentName != null) {
-      queueName = parentName + "." + queueName;
-    }
-    Map<QueueACL, AccessControlList> acls =
-        new HashMap<QueueACL, AccessControlList>();
-    NodeList fields = element.getChildNodes();
-    boolean isLeaf = true;
-
-    for (int j = 0; j < fields.getLength(); j++) {
-      Node fieldNode = fields.item(j);
-      if (!(fieldNode instanceof Element))
-        continue;
-      Element field = (Element) fieldNode;
-      if ("minResources".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
-        minQueueResources.put(queueName, val);
-      } else if ("maxResources".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
-        maxQueueResources.put(queueName, val);
-      } else if ("maxRunningApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        queueMaxApps.put(queueName, val);
-      } else if ("weight".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        double val = Double.parseDouble(text);
-        queueWeights.put(queueName, new ResourceWeights((float)val));
-      } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        long val = Long.parseLong(text) * 1000L;
-        minSharePreemptionTimeouts.put(queueName, val);
-      } else if ("schedulingPolicy".equals(field.getTagName())
-          || "schedulingMode".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        SchedulingPolicy policy = SchedulingPolicy.parse(text);
-        policy.initialize(scheduler.getClusterCapacity());
-        queuePolicies.put(queueName, policy);
-      } else if ("aclSubmitApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData();
-        acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
-      } else if ("aclAdministerApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData();
-        acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
-      } else if ("queue".endsWith(field.getTagName()) || 
-          "pool".equals(field.getTagName())) {
-        loadQueue(queueName, field, minQueueResources, maxQueueResources,
-            queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-            minSharePreemptionTimeouts,
-            queueAcls, queueNamesInAllocFile);
-        isLeaf = false;
-      }
-    }
-    if (isLeaf) {
-      queueNamesInAllocFile.add(queueName);
-    }
-    queueAcls.put(queueName, acls);
-    if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
-        && !Resources.fitsIn(minQueueResources.get(queueName),
-            maxQueueResources.get(queueName))) {
-      LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
-          queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
-    }
-  }
-
-  /**
-   * Get the minimum resource allocation for the given queue.
-   * @return the cap set on this queue, or 0 if not set.
-   */
-  public Resource getMinResources(String queue) {
-    Resource minQueueResource = info.minQueueResources.get(queue);
-    if (minQueueResource != null) {
-      return minQueueResource;
-    } else {
-      return Resources.createResource(0);
-    }
-  }
-
-  /**
-   * Get the maximum resource allocation for the given queue.
-   * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
-   */
-
-  public Resource getMaxResources(String queueName) {
-    Resource maxQueueResource = info.maxQueueResources.get(queueName);
-    if (maxQueueResource != null) {
-      return maxQueueResource;
-    } else {
-      return Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE);
-    }
-  }
-
   /**
    * Get a collection of all leaf queues
    */
@@ -567,141 +213,27 @@ public class QueueManager {
   public Collection<FSQueue> getQueues() {
     return queues.values();
   }
-
-  public int getUserMaxApps(String user) {
-    // save current info in case it gets changed under us
-    QueueManagerInfo info = this.info;
-    if (info.userMaxApps.containsKey(user)) {
-      return info.userMaxApps.get(user);
-    } else {
-      return info.userMaxAppsDefault;
-    }
-  }
-
-  public int getQueueMaxApps(String queue) {
-    // save current info in case it gets changed under us
-    QueueManagerInfo info = this.info;
-    if (info.queueMaxApps.containsKey(queue)) {
-      return info.queueMaxApps.get(queue);
-    } else {
-      return info.queueMaxAppsDefault;
-    }
-  }
   
-  public ResourceWeights getQueueWeight(String queue) {
-    ResourceWeights weight = info.queueWeights.get(queue);
-    if (weight != null) {
-      return weight;
-    } else {
-      return ResourceWeights.NEUTRAL;
-    }
-  }
-
-  /**
-   * Get a queue's min share preemption timeout, in milliseconds. This is the
-   * time after which jobs in the queue may kill other queues' tasks if they
-   * are below their min share.
-   */
-  public long getMinSharePreemptionTimeout(String queueName) {
-    // save current info in case it gets changed under us
-    QueueManagerInfo info = this.info;
-    if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
-      return info.minSharePreemptionTimeouts.get(queueName);
-    }
-    return info.defaultMinSharePreemptionTimeout;
-  }
-  
-  /**
-   * Get the fair share preemption, in milliseconds. This is the time
-   * after which any job may kill other jobs' tasks if it is below half
-   * its fair share.
-   */
-  public long getFairSharePreemptionTimeout() {
-    return info.fairSharePreemptionTimeout;
-  }
-
-  /**
-   * Get the ACLs associated with this queue. If a given ACL is not explicitly
-   * configured, include the default value for that ACL.  The default for the
-   * root queue is everybody ("*") and the default for all other queues is
-   * nobody ("")
-   */
-  public AccessControlList getQueueAcl(String queue, QueueACL operation) {
-    Map<QueueACL, AccessControlList> queueAcls = info.queueAcls.get(queue);
-    if (queueAcls == null || !queueAcls.containsKey(operation)) {
-      return (queue.equals(ROOT_QUEUE)) ? EVERYBODY_ACL : NOBODY_ACL;
-    }
-    return queueAcls.get(operation);
-  }
-  
-  static class QueueManagerInfo {
-    // Minimum resource allocation for each queue
-    public final Map<String, Resource> minQueueResources;
-    // Maximum amount of resources per queue
-    public final Map<String, Resource> maxQueueResources;
-    // Sharing weights for each queue
-    public final Map<String, ResourceWeights> queueWeights;
-    
-    // Max concurrent running applications for each queue and for each user; in addition,
-    // for users that have no max specified, we use the userMaxJobsDefault.
-    public final Map<String, Integer> queueMaxApps;
-    public final Map<String, Integer> userMaxApps;
-    public final int userMaxAppsDefault;
-    public final int queueMaxAppsDefault;
-
-    // ACL's for each queue. Only specifies non-default ACL's from configuration.
-    public final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
-
-    // Min share preemption timeout for each queue in seconds. If a job in the queue
-    // waits this long without receiving its guaranteed share, it is allowed to
-    // preempt other jobs' tasks.
-    public final Map<String, Long> minSharePreemptionTimeouts;
-
-    // Default min share preemption timeout for queues where it is not set
-    // explicitly.
-    public final long defaultMinSharePreemptionTimeout;
-
-    // Preemption timeout for jobs below fair share in seconds. If a job remains
-    // below half its fair share for this long, it is allowed to preempt tasks.
-    public final long fairSharePreemptionTimeout;
-
-    public final SchedulingPolicy defaultSchedulingPolicy;
-    
-    public QueueManagerInfo(Map<String, Resource> minQueueResources, 
-        Map<String, Resource> maxQueueResources, 
-        Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-        Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
-        int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, 
-        Map<String, Long> minSharePreemptionTimeouts, 
-        Map<String, Map<QueueACL, AccessControlList>> queueAcls,
-        long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
-      this.minQueueResources = minQueueResources;
-      this.maxQueueResources = maxQueueResources;
-      this.queueMaxApps = queueMaxApps;
-      this.userMaxApps = userMaxApps;
-      this.queueWeights = queueWeights;
-      this.userMaxAppsDefault = userMaxAppsDefault;
-      this.queueMaxAppsDefault = queueMaxAppsDefault;
-      this.defaultSchedulingPolicy = defaultSchedulingPolicy;
-      this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
-      this.queueAcls = queueAcls;
-      this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
-      this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+  public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
+    // Make sure all queues exist
+    for (String name : queueConf.getQueueNames()) {
+      getLeafQueue(name, true);
     }
     
-    public QueueManagerInfo() {
-      minQueueResources = new HashMap<String, Resource>();
-      maxQueueResources = new HashMap<String, Resource>();
-      queueWeights = new HashMap<String, ResourceWeights>();
-      queueMaxApps = new HashMap<String, Integer>();
-      userMaxApps = new HashMap<String, Integer>();
-      userMaxAppsDefault = Integer.MAX_VALUE;
-      queueMaxAppsDefault = Integer.MAX_VALUE;
-      queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
-      minSharePreemptionTimeouts = new HashMap<String, Long>();
-      defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-      fairSharePreemptionTimeout = Long.MAX_VALUE;
-      defaultSchedulingPolicy = SchedulingPolicy.getDefault();
+    for (FSQueue queue : queues.values()) {
+      // Update queue metrics
+      FSQueueMetrics queueMetrics = queue.getMetrics();
+      queueMetrics.setMinShare(queue.getMinShare());
+      queueMetrics.setMaxShare(queue.getMaxShare());
+      // Set scheduling policies
+      try {
+        SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
+        policy.initialize(scheduler.getClusterCapacity());
+        queue.setPolicy(policy);
+      } catch (AllocationConfigurationException ex) {
+        LOG.warn("Cannot apply configured scheduling policy to queue "
+            + queue.getName(), ex);
+      }
     }
   }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Fri Dec  6 06:57:15 2013
@@ -95,6 +95,34 @@ public class QueuePlacementPolicy {
   }
   
   /**
+   * Build a simple queue placement policy from the allow-undeclared-pools and
+   * user-as-default-queue configuration options.
+   */
+  public static QueuePlacementPolicy fromConfiguration(Configuration conf,
+      Set<String> configuredQueues) {
+    boolean create = conf.getBoolean(
+        FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
+        FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
+    boolean userAsDefaultQueue = conf.getBoolean(
+        FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
+        FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+    if (userAsDefaultQueue) {
+      rules.add(new QueuePlacementRule.User().initialize(create, null));
+    }
+    if (!userAsDefaultQueue || !create) {
+      rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    }
+    try {
+      return new QueuePlacementPolicy(rules, configuredQueues, conf);
+    } catch (AllocationConfigurationException ex) {
+      throw new RuntimeException("Should never hit exception when loading" +
+      		"placement policy from conf", ex);
+    }
+  }
+
+  /**
    * Applies this rule to an app with the given requested queue and user/group
    * information.
    * 
@@ -120,4 +148,8 @@ public class QueuePlacementPolicy {
     throw new IllegalStateException("Should have applied a rule before " +
     		"reaching here");
   }
+  
+  public List<QueuePlacementRule> getRules() {
+    return rules;
+  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Fri Dec  6 06:57:15 2013
@@ -35,7 +35,7 @@ public abstract class SchedulingPolicy {
   private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
       new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();
 
-  private static SchedulingPolicy DEFAULT_POLICY =
+  public static final SchedulingPolicy DEFAULT_POLICY =
       getInstance(FairSharePolicy.class);
   
   public static final byte DEPTH_LEAF = (byte) 1;
@@ -44,15 +44,6 @@ public abstract class SchedulingPolicy {
   public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate
   public static final byte DEPTH_ANY = (byte) 7;
 
-  public static SchedulingPolicy getDefault() {
-    return DEFAULT_POLICY;
-  }
-
-  public static void setDefault(String className)
-      throws AllocationConfigurationException {
-    DEFAULT_POLICY = parse(className);
-  }
-
   /**
    * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
    */

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java Fri Dec  6 06:57:15 2013
@@ -29,10 +29,10 @@ import javax.xml.bind.annotation.XmlSeeA
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @XmlRootElement
@@ -65,7 +65,7 @@ public class FairSchedulerQueueInfo {  
   }
   
   public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
-    QueueManager manager = scheduler.getQueueManager();
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     
     queueName = queue.getName();
     schedulingPolicy = queue.getPolicy().getName();
@@ -87,7 +87,7 @@ public class FairSchedulerQueueInfo {  
     fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
     fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
     
-    maxApps = manager.getQueueMaxApps(queueName);
+    maxApps = allocConf.getQueueMaxApps(queueName);
     
     Collection<FSQueue> children = queue.getChildQueues();
     childQueues = new ArrayList<FairSchedulerQueueInfo>();

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Fri Dec  6 06:57:15 2013
@@ -1247,6 +1247,8 @@ public class TestRMRestart {
     // renewDate before renewing
     Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
     try{
+      // Sleep for one millisecond to make sure renewDataAfterRenew is greater
+      Thread.sleep(1);
       // renew recovered token
       rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
     } catch(Exception e) {

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Fri Dec  6 06:57:15 2013
@@ -19,6 +19,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +36,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 
@@ -81,6 +86,8 @@ public class TestFSRMStateStore extends 
       YarnConfiguration conf = new YarnConfiguration();
       conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
           workingDirPathURI.toString());
+      conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
+        "100,6000");
       this.store = new TestFileSystemRMStore(conf);
       return store;
     }
@@ -139,4 +146,46 @@ public class TestFSRMStateStore extends 
       cluster.shutdown();
     }
   }
+
+  @Test (timeout = 30000)
+  public void testFSRMStateStoreClientRetry() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    try {
+      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
+      final RMStateStore store = fsTester.getRMStateStore();
+      store.setRMDispatcher(new TestDispatcher());
+      final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+      cluster.shutdownNameNodes();
+
+      Thread clientThread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            store.storeApplicationStateInternal("application1",
+              (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
+                .newApplicationStateData(111, 111, "user", null,
+                  RMAppState.ACCEPTED, "diagnostics", 333));
+          } catch (Exception e) {
+            // TODO 0 datanode exception will not be retried by dfs client, fix
+            // that separately.
+            if (!e.getMessage().contains("could only be replicated" +
+                " to 0 nodes instead of minReplication (=1)")) {
+              assertionFailedInThread.set(true);
+            }
+            e.printStackTrace();
+          }
+        }
+      };
+      Thread.sleep(2000);
+      clientThread.start();
+      cluster.restartNameNode();
+      clientThread.join();
+      Assert.assertFalse(assertionFailedInThread.get());
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Fri Dec  6 06:57:15 2013
@@ -37,6 +37,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -114,6 +115,37 @@ public class TestZKRMStateStoreZKClientC
     }
   }
 
+  @Test (timeout = 20000)
+  public void testZKClientRetry() throws Exception {
+    TestZKClient zkClientTester = new TestZKClient();
+    final String path = "/test";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+    conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
+    final ZKRMStateStore store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+    final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+
+    stopServer();
+    Thread clientThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          store.getDataWithRetries(path, true);
+        } catch (Exception e) {
+          e.printStackTrace();
+          assertionFailedInThread.set(true);
+        }
+      }
+    };
+    Thread.sleep(2000);
+    startServer();
+    clientThread.join();
+    Assert.assertFalse(assertionFailedInThread.get());
+  }
+
   @Test(timeout = 20000)
   public void testZKClientDisconnectAndReconnect()
       throws Exception {

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Fri Dec  6 06:57:15 2013
@@ -51,11 +51,11 @@ public class TestFSLeafQueue {
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     String queueName = "root.queue1";
-    QueueManager mockMgr = mock(QueueManager.class);
-    when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
-    when(mockMgr.getMinResources(queueName)).thenReturn(Resources.none());
+    scheduler.allocConf = mock(AllocationConfiguration.class);
+    when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
+    when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
 
-    schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null);
+    schedulable = new FSLeafQueue(queueName, scheduler, null);
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1548386&r1=1548385&r2=1548386&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Dec  6 06:57:15 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -86,7 +85,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
@@ -121,6 +119,7 @@ public class TestFairScheduler {
 
   private FairScheduler scheduler;
   private ResourceManager resourceManager;
+  private Configuration conf;
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   private int APP_ID = 1; // Incrementing counter for schedling apps
@@ -130,7 +129,7 @@ public class TestFairScheduler {
   @Before
   public void setUp() throws IOException {
     scheduler = new FairScheduler();
-    Configuration conf = createConfiguration();
+    conf = createConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
     conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
       1024);
@@ -145,7 +144,6 @@ public class TestFairScheduler {
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
     resourceManager.getRMContext().getStateStore().start();
 
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
     // to initialize the master key
     resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
   }
@@ -291,7 +289,6 @@ public class TestFairScheduler {
 
   @Test(timeout=2000)
   public void testLoadConfigurationOnInitialize() throws IOException {
-    Configuration conf = createConfiguration();
     conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
     conf.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 3);
     conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
@@ -362,6 +359,8 @@ public class TestFairScheduler {
   
   @Test
   public void testAggregateCapacityTracking() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add a node
     RMNode node1 =
         MockNodes
@@ -384,7 +383,9 @@ public class TestFairScheduler {
   }
 
   @Test
-  public void testSimpleFairShareCalculation() {
+  public void testSimpleFairShareCalculation() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
         MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024), 1,
@@ -409,7 +410,9 @@ public class TestFairScheduler {
   }
   
   @Test
-  public void testSimpleHierarchicalFairShareCalculation() {
+  public void testSimpleHierarchicalFairShareCalculation() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add one big node (only care about aggregate capacity)
     int capacity = 10 * 24;
     RMNode node1 =
@@ -440,7 +443,9 @@ public class TestFairScheduler {
   }
 
   @Test
-  public void testHierarchicalQueuesSimilarParents() {
+  public void testHierarchicalQueuesSimilarParents() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     QueueManager queueManager = scheduler.getQueueManager();
     FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child", true);
     Assert.assertEquals(2, queueManager.getLeafQueues().size());
@@ -462,8 +467,9 @@ public class TestFairScheduler {
   }
 
   @Test
-  public void testSchedulerRootQueueMetrics() throws InterruptedException {
-	  
+  public void testSchedulerRootQueueMetrics() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add a node
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
@@ -500,7 +506,9 @@ public class TestFairScheduler {
   }
 
   @Test (timeout = 5000)
-  public void testSimpleContainerAllocation() {
+  public void testSimpleContainerAllocation() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add a node
     RMNode node1 =
         MockNodes
@@ -546,7 +554,9 @@ public class TestFairScheduler {
   }
 
   @Test (timeout = 5000)
-  public void testSimpleContainerReservation() throws InterruptedException {
+  public void testSimpleContainerReservation() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add a node
     RMNode node1 =
         MockNodes
@@ -598,7 +608,6 @@ public class TestFairScheduler {
 
   @Test
   public void testUserAsDefaultQueue() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     RMContext rmContext = resourceManager.getRMContext();
@@ -617,14 +626,24 @@ public class TestFairScheduler {
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
         .getRunnableAppSchedulables().size());
     assertEquals("root.user1", rmApp.getQueue());
-
+  }
+  
+  @Test
+  public void testNotUserAsDefaultQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    scheduler.getQueueManager().initialize();
+    RMContext rmContext = resourceManager.getRMContext();
+    Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
+    ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
+    RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
+        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
+            null, null, null, false, false, 0, null, null), null, null, 0, null);
+    appsMap.put(appAttemptId.getApplicationId(), rmApp);
+    
     AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
-        createAppAttemptId(2, 1), "default", "user2");
+        appAttemptId, "default", "user2");
     scheduler.handle(appAddedEvent2);
-    assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
+    assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getRunnableAppSchedulables().size());
     assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
         .getRunnableAppSchedulables().size());
@@ -634,7 +653,7 @@ public class TestFairScheduler {
 
   @Test
   public void testEmptyQueueName() throws Exception {
-    Configuration conf = createConfiguration();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // only default queue
     assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
@@ -653,7 +672,6 @@ public class TestFairScheduler {
 
   @Test
   public void testAssignToQueue() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     
@@ -672,9 +690,10 @@ public class TestFairScheduler {
   
   @Test
   public void testQueuePlacementWithPolicy() throws Exception {
-    Configuration conf = createConfiguration();
     conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
         SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     ApplicationAttemptId appId;
     Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
 
@@ -684,10 +703,10 @@ public class TestFairScheduler {
     rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
     rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null));
     rules.add(new QueuePlacementRule.Default().initialize(true, null));
-    Set<String> queues = Sets.newHashSet("root.user1", "root.user3group", 
+    Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
         "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
-    scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
-        rules, queues, conf);
+    scheduler.getAllocationConfiguration().placementPolicy =
+        new QueuePlacementPolicy(rules, queues, conf);
     appId = createSchedulingRequest(1024, "somequeue", "user1");
     assertEquals("root.somequeue", apps.get(appId).getQueueName());
     appId = createSchedulingRequest(1024, "default", "user1");
@@ -706,8 +725,8 @@ public class TestFairScheduler {
     rules.add(new QueuePlacementRule.User().initialize(false, null));
     rules.add(new QueuePlacementRule.Specified().initialize(true, null));
     rules.add(new QueuePlacementRule.Default().initialize(true, null));
-    scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
-        rules, queues, conf);
+    scheduler.getAllocationConfiguration().placementPolicy =
+        new QueuePlacementPolicy(rules, queues, conf);
     appId = createSchedulingRequest(1024, "somequeue", "user1");
     assertEquals("root.user1", apps.get(appId).getQueueName());
     appId = createSchedulingRequest(1024, "somequeue", "otheruser");
@@ -718,9 +737,7 @@ public class TestFairScheduler {
 
   @Test
   public void testFairShareWithMinAlloc() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -733,9 +750,8 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
@@ -767,6 +783,8 @@ public class TestFairScheduler {
    */
   @Test
   public void testQueueDemandCalculation() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
     scheduler.addApplication(id11, "root.queue1", "user1");
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
@@ -812,6 +830,8 @@ public class TestFairScheduler {
 
   @Test
   public void testAppAdditionAndRemoval() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
         createAppAttemptId(1, 1), "default", "user1");
     scheduler.handle(appAddedEvent1);
@@ -835,132 +855,9 @@ public class TestFairScheduler {
   }
 
   @Test
-  public void testAllocationFileParsing() throws Exception {
-    Configuration conf = createConfiguration();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    // Give queue A a minimum of 1024 M
-    out.println("<queue name=\"queueA\">");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    // Give queue B a minimum of 2048 M
-    out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
-    out.println("<schedulingPolicy>fair</schedulingPolicy>");
-    out.println("</queue>");
-    // Give queue C no minimum
-    out.println("<queue name=\"queueC\">");
-    out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
-    out.println("</queue>");
-    // Give queue D a limit of 3 running apps
-    out.println("<queue name=\"queueD\">");
-    out.println("<maxRunningApps>3</maxRunningApps>");
-    out.println("</queue>");
-    // Give queue E a preemption timeout of one minute
-    out.println("<queue name=\"queueE\">");
-    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
-    out.println("</queue>");
-    // Set default limit of apps per queue to 15
-    out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
-    // Set default limit of apps per user to 5
-    out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
-    // Give user1 a limit of 10 jobs
-    out.println("<user name=\"user1\">");
-    out.println("<maxRunningApps>10</maxRunningApps>");
-    out.println("</user>");
-    // Set default min share preemption timeout to 2 minutes
-    out.println("<defaultMinSharePreemptionTimeout>120"
-        + "</defaultMinSharePreemptionTimeout>");
-    // Set fair share preemption timeout to 5 minutes
-    out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
-    // Set default scheduling policy to DRF
-    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
-    out.println("</allocations>");
-    out.close();
-
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
-
-    assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
-
-    assertEquals(Resources.createResource(1024, 0),
-        queueManager.getMinResources("root.queueA"));
-    assertEquals(Resources.createResource(2048, 0),
-        queueManager.getMinResources("root.queueB"));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root.queueC"));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root.queueD"));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root.queueE"));
-
-    assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
-    assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
-    assertEquals(10, queueManager.getUserMaxApps("user1"));
-    assertEquals(5, queueManager.getUserMaxApps("user2"));
-
-    // Root should get * ACL
-    assertEquals("*",queueManager.getQueueAcl("root",
-        QueueACL.ADMINISTER_QUEUE).getAclString());
-    assertEquals("*", queueManager.getQueueAcl("root",
-        QueueACL.SUBMIT_APPLICATIONS).getAclString());
-
-    // Unspecified queues should get default ACL
-    assertEquals(" ",queueManager.getQueueAcl("root.queueA",
-        QueueACL.ADMINISTER_QUEUE).getAclString());
-    assertEquals(" ", queueManager.getQueueAcl("root.queueA",
-        QueueACL.SUBMIT_APPLICATIONS).getAclString());
-
-    // Queue B ACL
-    assertEquals("alice,bob admins",queueManager.getQueueAcl("root.queueB",
-        QueueACL.ADMINISTER_QUEUE).getAclString());
-
-    // Queue C ACL
-    assertEquals("alice,bob admins",queueManager.getQueueAcl("root.queueC",
-        QueueACL.SUBMIT_APPLICATIONS).getAclString());
-
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." + 
-        YarnConfiguration.DEFAULT_QUEUE_NAME));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
-    assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
-    assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
-    
-    // Verify existing queues have default scheduling policy
-    assertEquals(DominantResourceFairnessPolicy.NAME,
-        queueManager.getQueue("root").getPolicy().getName());
-    assertEquals(DominantResourceFairnessPolicy.NAME,
-        queueManager.getQueue("root.queueA").getPolicy().getName());
-    // Verify default is overriden if specified explicitly
-    assertEquals(FairSharePolicy.NAME,
-        queueManager.getQueue("root.queueB").getPolicy().getName());
-    // Verify new queue gets default scheduling policy
-    assertEquals(DominantResourceFairnessPolicy.NAME,
-        queueManager.getLeafQueue("root.newqueue", true).getPolicy().getName());
-  }
-
-  @Test
   public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException, 
       AllocationConfigurationException, ParserConfigurationException {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -980,9 +877,9 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
-    
     Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
     Assert.assertEquals(4, leafQueues.size());
     Assert.assertNotNull(queueManager.getLeafQueue("queueA", false));
@@ -995,9 +892,7 @@ public class TestFairScheduler {
   
   @Test
   public void testConfigureRootQueue() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -1014,9 +909,9 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-
+    
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
     QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
     
     FSQueue root = queueManager.getRootQueue();
     assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
@@ -1025,136 +920,9 @@ public class TestFairScheduler {
     assertNotNull(queueManager.getLeafQueue("child2", false));
   }
   
-  /**
-   * Verify that you can't place queues at the same level as the root queue in
-   * the allocations file.
-   */
-  @Test (expected = AllocationConfigurationException.class)
-  public void testQueueAlongsideRoot() throws Exception {
-    Configuration conf = createConfiguration();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"root\">");
-    out.println("</queue>");
-    out.println("<queue name=\"other\">");
-    out.println("</queue>");
-    out.println("</allocations>");
-    out.close();
-
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
-  }
-  
-  @Test
-  public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
-    Configuration conf = createConfiguration();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    // Give queue A a minimum of 1024 M
-    out.println("<pool name=\"queueA\">");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</pool>");
-    // Give queue B a minimum of 2048 M
-    out.println("<pool name=\"queueB\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
-    out.println("</pool>");
-    // Give queue C no minimum
-    out.println("<pool name=\"queueC\">");
-    out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
-    out.println("</pool>");
-    // Give queue D a limit of 3 running apps
-    out.println("<pool name=\"queueD\">");
-    out.println("<maxRunningApps>3</maxRunningApps>");
-    out.println("</pool>");
-    // Give queue E a preemption timeout of one minute
-    out.println("<pool name=\"queueE\">");
-    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
-    out.println("</pool>");
-    // Set default limit of apps per queue to 15
-    out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
-    // Set default limit of apps per user to 5
-    out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
-    // Give user1 a limit of 10 jobs
-    out.println("<user name=\"user1\">");
-    out.println("<maxRunningApps>10</maxRunningApps>");
-    out.println("</user>");
-    // Set default min share preemption timeout to 2 minutes
-    out.println("<defaultMinSharePreemptionTimeout>120"
-        + "</defaultMinSharePreemptionTimeout>");
-    // Set fair share preemption timeout to 5 minutes
-    out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
-    out.println("</allocations>");
-    out.close();
-
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
-
-    assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
-
-    assertEquals(Resources.createResource(1024, 0),
-        queueManager.getMinResources("root.queueA"));
-    assertEquals(Resources.createResource(2048, 0),
-        queueManager.getMinResources("root.queueB"));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root.queueC"));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root.queueD"));
-    assertEquals(Resources.createResource(0),
-        queueManager.getMinResources("root.queueE"));
-
-    assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
-    assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
-    assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
-    assertEquals(10, queueManager.getUserMaxApps("user1"));
-    assertEquals(5, queueManager.getUserMaxApps("user2"));
-
-    // Unspecified queues should get default ACL
-    assertEquals(" ", queueManager.getQueueAcl("root.queueA",
-        QueueACL.ADMINISTER_QUEUE).getAclString());
-    assertEquals(" ", queueManager.getQueueAcl("root.queueA",
-        QueueACL.SUBMIT_APPLICATIONS).getAclString());
-
-    // Queue B ACL
-    assertEquals("alice,bob admins", queueManager.getQueueAcl("root.queueB",
-        QueueACL.ADMINISTER_QUEUE).getAclString());
-
-    // Queue C ACL
-    assertEquals("alice,bob admins", queueManager.getQueueAcl("root.queueC",
-        QueueACL.SUBMIT_APPLICATIONS).getAclString());
-
-
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." +
-        YarnConfiguration.DEFAULT_QUEUE_NAME));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
-    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
-    assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
-    assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
-  }
-
   @Test (timeout = 5000)
   public void testIsStarvedForMinShare() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -1168,8 +936,7 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
@@ -1212,9 +979,7 @@ public class TestFairScheduler {
 
   @Test (timeout = 5000)
   public void testIsStarvedForFairShare() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -1228,9 +993,8 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
-
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+    
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
         MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
@@ -1277,13 +1041,9 @@ public class TestFairScheduler {
    * now this means decreasing order of priority.
    */
   public void testChoiceOfPreemptedContainers() throws Exception {
-    Configuration conf = createConfiguration();
-    
     conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
     conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); 
-    
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     MockClock clock = new MockClock();
     scheduler.setClock(clock);
@@ -1305,9 +1065,8 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Create four nodes
     RMNode node1 =
@@ -1443,15 +1202,16 @@ public class TestFairScheduler {
    * Tests the timing of decision to preempt tasks.
    */
   public void testPreemptionDecision() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     MockClock clock = new MockClock();
     scheduler.setClock(clock);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
     out.println("<queue name=\"queueA\">");
     out.println("<weight>.25</weight>");
     out.println("<minResources>1024mb,0vcores</minResources>");
@@ -1473,8 +1233,7 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Create four nodes
     RMNode node1 =
@@ -1570,7 +1329,9 @@ public class TestFairScheduler {
   }
   
   @Test (timeout = 5000)
-  public void testMultipleContainersWaitingForReservation() {
+  public void testMultipleContainersWaitingForReservation() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add a node
     RMNode node1 =
         MockNodes
@@ -1600,9 +1361,7 @@ public class TestFairScheduler {
   @Test (timeout = 5000)
   public void testUserMaxRunningApps() throws Exception {
     // Set max running apps
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -1613,8 +1372,7 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     // Add a node
     RMNode node1 =
@@ -1654,7 +1412,9 @@ public class TestFairScheduler {
   }
   
   @Test (timeout = 5000)
-  public void testReservationWhileMultiplePriorities() {
+  public void testReservationWhileMultiplePriorities() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     // Add a node
     RMNode node1 =
         MockNodes
@@ -1717,9 +1477,7 @@ public class TestFairScheduler {
   @Test
   public void testAclSubmitApplication() throws Exception {
     // Set acl's
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -1735,8 +1493,7 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
         "norealuserhasthisname", 1);
@@ -1751,6 +1508,8 @@ public class TestFairScheduler {
   
   @Test (timeout = 5000)
   public void testMultipleNodesSingleRackRequest() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node1 =
         MockNodes
             .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1797,6 +1556,8 @@ public class TestFairScheduler {
   
   @Test (timeout = 5000)
   public void testFifoWithinQueue() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node1 =
         MockNodes
             .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
@@ -1837,11 +1598,9 @@ public class TestFairScheduler {
   }
 
   @Test(timeout = 3000)
-  public void testMaxAssign() throws AllocationConfigurationException {
-    // set required scheduler configs
-    scheduler.assignMultiple = true;
-    scheduler.getQueueManager().getLeafQueue("root.default", true)
-        .setPolicy(SchedulingPolicy.getDefault());
+  public void testMaxAssign() throws Exception {
+    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node =
         MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
@@ -1884,6 +1643,8 @@ public class TestFairScheduler {
    */
   @Test(timeout = 5000)
   public void testAssignContainer() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     final String user = "user1";
     final String fifoQueue = "fifo";
     final String fairParent = "fairParent";
@@ -1951,9 +1712,7 @@ public class TestFairScheduler {
   @Test
   public void testNotAllowSubmitApplication() throws Exception {
     // Set acl's
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
@@ -1967,8 +1726,8 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     int appId = this.APP_ID++;
     String user = "usernotallow";
@@ -2017,7 +1776,9 @@ public class TestFairScheduler {
   }
   
   @Test
-  public void testReservationThatDoesntFit() {
+  public void testReservationThatDoesntFit() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node1 =
         MockNodes
             .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -2043,7 +1804,9 @@ public class TestFairScheduler {
   }
   
   @Test
-  public void testRemoveNodeUpdatesRootQueueMetrics() {
+  public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
 	assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
     
@@ -2069,7 +1832,9 @@ public class TestFairScheduler {
 }
 
   @Test
-  public void testStrictLocality() {
+  public void testStrictLocality() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
@@ -2107,7 +1872,9 @@ public class TestFairScheduler {
   }
   
   @Test
-  public void testCancelStrictLocality() {
+  public void testCancelStrictLocality() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
@@ -2155,7 +1922,9 @@ public class TestFairScheduler {
    * a reservation on another.
    */
   @Test
-  public void testReservationsStrictLocality() {
+  public void testReservationsStrictLocality() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
     RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
@@ -2193,7 +1962,9 @@ public class TestFairScheduler {
   }
   
   @Test
-  public void testNoMoreCpuOnNode() {
+  public void testNoMoreCpuOnNode() throws IOException {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
         1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
@@ -2213,6 +1984,8 @@ public class TestFairScheduler {
 
   @Test
   public void testBasicDRFAssignment() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
     NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
     scheduler.handle(nodeEvent);
@@ -2251,6 +2024,8 @@ public class TestFairScheduler {
    */
   @Test
   public void testBasicDRFWithQueues() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
         1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
@@ -2285,6 +2060,8 @@ public class TestFairScheduler {
   
   @Test
   public void testDRFHierarchicalQueues() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
         1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
@@ -2349,9 +2126,9 @@ public class TestFairScheduler {
 
   @Test(timeout = 30000)
   public void testHostPortNodeName() throws Exception {
-    scheduler.getConf().setBoolean(YarnConfiguration
+    conf.setBoolean(YarnConfiguration
         .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
-    scheduler.reinitialize(scheduler.getConf(), 
+    scheduler.reinitialize(conf, 
         resourceManager.getRMContext());
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
         1, "127.0.0.1", 1);
@@ -2426,9 +2203,7 @@ public class TestFairScheduler {
   
   @Test
   public void testUserAndQueueMaxRunningApps() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -2442,8 +2217,7 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
     
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     // exceeds no limits
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
@@ -2479,9 +2253,7 @@ public class TestFairScheduler {
   
   @Test
   public void testMaxRunningAppsHierarchicalQueues() throws Exception {
-    Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
     MockClock clock = new MockClock();
     scheduler.setClock(clock);
 
@@ -2499,8 +2271,7 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
     
-    QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     // exceeds no limits
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
@@ -2629,10 +2400,8 @@ public class TestFairScheduler {
   
   @Test
   public void testDontAllowUndeclaredPools() throws Exception{
-    Configuration conf = createConfiguration();
     conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -2642,8 +2411,8 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
     QueueManager queueManager = scheduler.getQueueManager();
-    queueManager.initialize();
     
     FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false);
     FSLeafQueue defaultQueue = queueManager.getLeafQueue("default", false);
@@ -2672,6 +2441,8 @@ public class TestFairScheduler {
   @SuppressWarnings("resource")
   @Test
   public void testBlacklistNodes() throws Exception {
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
     final int GB = 1024;
     String host = "127.0.0.1";
     RMNode node =



Mime
View raw message