hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [20/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java
(original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java
Sat Nov 28 20:26:01 2009
@@ -17,28 +17,20 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
 /**
  *  Class to encapsulate Queue ACLs for a particular
  *  user.
- * 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.QueueAclsInfo} instead
  */
-class QueueAclsInfo implements Writable {
+@Deprecated
+class QueueAclsInfo extends org.apache.hadoop.mapreduce.QueueAclsInfo {
 
-  private String queueName;
-  private String[] operations;
   /**
    * Default constructor for QueueAclsInfo.
    * 
    */
   QueueAclsInfo() {
-    
+    super();
   }
 
   /**
@@ -50,31 +42,11 @@
    * 
    */
   QueueAclsInfo(String queueName, String[] operations) {
-    this.queueName = queueName;
-    this.operations = operations;    
-  }
-
-  String getQueueName() {
-    return queueName;
-  }
-
-  void setQueueName(String queueName) {
-    this.queueName = queueName;
+    super(queueName, operations);
   }
-
-  String[] getOperations() {
-    return operations;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    queueName = Text.readString(in);
-    operations = WritableUtils.readStringArray(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    Text.writeString(out, queueName);
-    WritableUtils.writeStringArray(out, operations);
+  
+  public static QueueAclsInfo downgrade(
+      org.apache.hadoop.mapreduce.QueueAclsInfo acl) {
+    return new QueueAclsInfo(acl.getQueueName(), acl.getOperations());
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java
(original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java
Sat Nov 28 20:26:01 2009
@@ -18,171 +18,260 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher;
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
+
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.List;
+import java.net.URL;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Queue.QueueState;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.SecurityUtil.AccessControlList;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * Class that exposes information about queues maintained by the Hadoop
  * Map/Reduce framework.
- * 
+ * <p/>
  * The Map/Reduce framework can be configured with one or more queues,
- * depending on the scheduler it is configured with. While some 
- * schedulers work only with one queue, some schedulers support multiple 
- * queues.
- *  
+ * depending on the scheduler it is configured with. While some
+ * schedulers work only with one queue, some schedulers support multiple
+ * queues. Some schedulers also support the notion of queues within
+ * queues - a feature called hierarchical queues.
+ * <p/>
+ * Queue names are unique, and used as a key to lookup queues. Hierarchical
+ * queues are named by a 'fully qualified name' such as q1:q2:q3, where
+ * q2 is a child queue of q1 and q3 is a child queue of q2.
+ * <p/>
+ * Leaf level queues are queues that contain no queues within them. Jobs
+ * can be submitted only to leaf level queues.
+ * <p/>
  * Queues can be configured with various properties. Some of these
  * properties are common to all schedulers, and those are handled by this
- * class. Schedulers might also associate several custom properties with 
- * queues. Where such a case exists, the queue name must be used to link 
- * the common properties with the scheduler specific ones.  
+ * class. Schedulers might also associate several custom properties with
+ * queues. These properties are parsed and maintained per queue by the
+ * framework. If schedulers need more complicated structure to maintain
+ * configuration per queue, they are free to not use the facilities
+ * provided by the framework, but define their own mechanisms. In such cases,
+ * it is likely that the name of the queue will be used to relate the
+ * common properties of a queue with scheduler specific properties.
+ * <p/>
+ * Information related to a queue, such as its name, properties, scheduling
+ * information and children are exposed by this class via a serializable
+ * class called {@link JobQueueInfo}.
+ * <p/>
+ * Queues are configured in the configuration file mapred-queues.xml.
+ * To support backwards compatibility, queues can also be configured
+ * in mapred-site.xml. However, when configured in the latter, there is
+ * no support for hierarchical queues.
  */
+
 class QueueManager {
-  
-  private static final Log LOG = LogFactory.getLog(QueueManager.class);
 
-  // Configured queues this is backed by queues Map , mentioned below
-  private Set<String> queueNames;
+  private static final Log LOG = LogFactory.getLog(QueueManager.class);
 
   // Map of a queue name and Queue object
-  private HashMap<String, Queue> queues;
-
-  // Whether ACLs are enabled in the system or not.
-  private boolean aclsEnabled;
-
-  static final String QUEUE_STATE_SUFFIX = "state";
-
+  private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
+  private Map<String, Queue> allQueues = new HashMap<String, Queue>();
   static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
 
   // Prefix in configuration for queue related keys
   static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
-                          = "mapred.queue.";//Resource in which queue acls are configured.
+    = "mapred.queue.";
+  //Resource in which queue acls are configured.
+
+  private Queue root = null;
+  private boolean isAclEnabled = false;
+
+  /**
+   * Factory method to create an appropriate instance of a queue
+   * configuration parser.
+   * <p/>
+   * Returns a parser that can parse either the deprecated property
+   * style queue configuration in mapred-site.xml, or one that can
+   * parse hierarchical queues in mapred-queues.xml. First preference
+   * is given to configuration in mapred-site.xml. If no queue
+   * configuration is found there, then a parser that can parse
+   * configuration in mapred-queues.xml is created.
+   *
+   * @param conf Configuration instance that determines which parser
+   *             to use.
+   * @return Queue configuration parser
+   */
+  static QueueConfigurationParser getQueueConfigurationParser(
+    Configuration conf, boolean reloadConf) {
+    if (conf != null && conf.get(
+      DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
+      if (reloadConf) {
+        conf.reloadConfiguration();
+      }
+      return new DeprecatedQueueConfigurationParser(conf);
+    } else {
+      URL filePath =
+        Thread.currentThread().getContextClassLoader()
+          .getResource(QUEUE_CONF_FILE_NAME);
+      return new QueueConfigurationParser(filePath.getPath());
+    }
+  }
+
+  public QueueManager() {
+    initialize(getQueueConfigurationParser(null, false));
+  }
 
   /**
    * Construct a new QueueManager using configuration specified in the passed
    * in {@link org.apache.hadoop.conf.Configuration} object.
-   * 
+   * <p/>
+   * This instance supports queue configuration specified in mapred-site.xml,
+   * but without support for hierarchical queues. If no queue configuration
+   * is found in mapred-site.xml, it will then look for site configuration
+   * in mapred-queues.xml supporting hierarchical queues.
+   *
    * @param conf Configuration object where queue configuration is specified.
    */
   public QueueManager(Configuration conf) {
-    checkDeprecation(conf);
-    conf.addResource(QUEUE_CONF_FILE_NAME);
-
-    queues = new HashMap<String, Queue>();
+    initialize(getQueueConfigurationParser(conf, false));
+  }
 
-    // First get the queue names 
-    String[] queueNameValues = conf.getStrings("mapred.queue.names",
-        new String[]{JobConf.DEFAULT_QUEUE_NAME});
-
-    // Get configured ACLs and state for each queue
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
-    for (String name : queueNameValues) {
-      try {
-        Map<String, AccessControlList> acls = getQueueAcls(name, conf);
-        QueueState state = getQueueState(name, conf);
-        queues.put(name, new Queue(name, acls, state));
-      } catch (Throwable t) {
-        LOG.warn("Not able to initialize queue " + name);
-      }
-    }
-    // Sync queue names with the configured queues.
-    queueNames = queues.keySet();
+  /**
+   * Create an instance that supports hierarchical queues, defined in
+   * the passed in configuration file.
+   * <p/>
+   * This is mainly used for testing purposes and should not called from
+   * production code.
+   *
+   * @param confFile File where the queue configuration is found.
+   */
+  QueueManager(String confFile) {
+    QueueConfigurationParser cp = new QueueConfigurationParser(confFile);
+    initialize(cp);
   }
-  
+
   /**
-   * Return the set of queues configured in the system.
+   * Initialize the queue-manager with the queue hierarchy specified by the
+   * given {@link QueueConfigurationParser}.
    * 
-   * The number of queues configured should be dependent on the Scheduler 
+   * @param cp
+   */
+  private void initialize(QueueConfigurationParser cp) {
+    this.root = cp.getRoot();
+    leafQueues.clear();
+    allQueues.clear();
+    //At this point we have root populated
+    //update data structures leafNodes.
+    leafQueues = getRoot().getLeafQueues();
+    allQueues.putAll(getRoot().getInnerQueues());
+    allQueues.putAll(leafQueues);
+
+    LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
+    this.isAclEnabled = cp.isAclsEnabled();
+  }
+
+
+  /**
+   * Return the set of leaf level queues configured in the system to
+   * which jobs are submitted.
+   * <p/>
+   * The number of queues configured should be dependent on the Scheduler
    * configured. Note that some schedulers work with only one queue, whereas
    * others can support multiple queues.
-   *  
+   *
    * @return Set of queue names.
    */
-  public synchronized Set<String> getQueues() {
-    return queueNames;
+  public synchronized Set<String> getLeafQueueNames() {
+    return leafQueues.keySet();
   }
-  
+
   /**
-   * Return true if the given {@link Queue.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be
    * performed by the specified user on the given queue.
-   * 
+   * <p/>
    * An operation is allowed if all users are provided access for this
    * operation, or if either the user or any of the groups specified is
    * provided access.
-   * 
-   * @param queueName Queue on which the operation needs to be performed. 
-   * @param oper The operation to perform
-   * @param ugi The user and groups who wish to perform the operation.
-   * 
+   *
+   * @param queueName Queue on which the operation needs to be performed.
+   * @param oper      The operation to perform
+   * @param ugi       The user and groups who wish to perform the operation.
    * @return true if the operation is allowed, false otherwise.
    */
-  public synchronized boolean hasAccess(String queueName, 
-                                Queue.QueueOperation oper,
-                                UserGroupInformation ugi) {
+  public synchronized boolean hasAccess(
+    String queueName,
+    Queue.QueueOperation oper,
+    UserGroupInformation ugi) {
     return hasAccess(queueName, null, oper, ugi);
   }
-  
+
   /**
-   * Return true if the given {@link Queue.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be
    * performed by the specified user on the specified job in the given queue.
-   * 
-   * An operation is allowed either if the owner of the job is the user 
+   * <p/>
+   * An operation is allowed either if the owner of the job is the user
    * performing the task, all users are provided access for this
    * operation, or if either the user or any of the groups specified is
    * provided access.
-   * 
-   * If the {@link Queue.QueueOperation} is not job specific then the 
+   * <p/>
+   * If the {@link Queue.QueueOperation} is not job specific then the
    * job parameter is ignored.
-   * 
+   *
    * @param queueName Queue on which the operation needs to be performed.
-   * @param job The {@link JobInProgress} on which the operation is being
-   *            performed. 
-   * @param oper The operation to perform
-   * @param ugi The user and groups who wish to perform the operation.
-   * 
+   * @param job       The {@link JobInProgress} on which the operation is being
+   *                  performed.
+   * @param oper      The operation to perform
+   * @param ugi       The user and groups who wish to perform the operation.
    * @return true if the operation is allowed, false otherwise.
    */
-  public synchronized boolean hasAccess(String queueName, JobInProgress job, 
-                                Queue.QueueOperation oper, 
-                                UserGroupInformation ugi) {
-    if (!aclsEnabled) {
-      return true;
-    }
+  public synchronized boolean hasAccess(
+    String queueName, JobInProgress job,
+    Queue.QueueOperation oper,
+    UserGroupInformation ugi) {
     
-    Queue q = queues.get(queueName);
+    Queue q = leafQueues.get(queueName);
+
     if (q == null) {
       LOG.info("Queue " + queueName + " is not present");
       return false;
     }
-    
+
+    if(q.getChildren() != null && !q.getChildren().isEmpty()) {
+      LOG.info("Cannot submit job to parent queue " + q.getName());
+      return false;
+    }
+
+    if (!isAclsEnabled()) {
+      return true;
+    }
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("checking access for : " 
+      LOG.debug(
+        "checking access for : "
           + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
     }
 
     if (oper.isJobOwnerAllowed()) {
-      if (job != null 
-          && job.getJobConf().getUser().equals(ugi.getUserName())) {
+      if (job != null
+        && job.getJobConf().getUser().equals(ugi.getUserName())) {
         return true;
       }
     }
-    
+
     AccessControlList acl = q.getAcls().get(
-                                toFullPropertyName(queueName, 
-                                    oper.getAclName()));
+      toFullPropertyName(
+        queueName,
+        oper.getAclName()));
     if (acl == null) {
       return false;
     }
@@ -210,210 +299,200 @@
 
   /**
    * Checks whether the given queue is running or not.
+   *
    * @param queueName name of the queue
    * @return true, if the queue is running.
    */
   synchronized boolean isRunning(String queueName) {
-    Queue q = queues.get(queueName);
+    Queue q = leafQueues.get(queueName);
     if (q != null) {
       return q.getState().equals(QueueState.RUNNING);
     }
     return false;
   }
-  
+
   /**
    * Set a generic Object that represents scheduling information relevant
    * to a queue.
-   * 
+   * <p/>
    * A string representation of this Object will be used by the framework
    * to display in user facing applications like the JobTracker web UI and
    * the hadoop CLI.
-   * 
-   * @param queueName queue for which the scheduling information is to be set. 
+   *
+   * @param queueName queue for which the scheduling information is to be set.
    * @param queueInfo scheduling information for this queue.
    */
-  public synchronized void setSchedulerInfo(String queueName, 
-                                              Object queueInfo) {
-    if (queues.get(queueName) != null)
-      queues.get(queueName).setSchedulingInfo(queueInfo);
+  public synchronized void setSchedulerInfo(
+    String queueName,
+    Object queueInfo) {
+    if (allQueues.get(queueName) != null) {
+      allQueues.get(queueName).setSchedulingInfo(queueInfo);
+    }
   }
-  
+
   /**
    * Return the scheduler information configured for this queue.
-   * 
+   *
    * @param queueName queue for which the scheduling information is required.
    * @return The scheduling information for this queue.
-   * 
-   * @see #setSchedulingInfo(String, Object)
    */
   public synchronized Object getSchedulerInfo(String queueName) {
-    if (queues.get(queueName) != null)
-      return queues.get(queueName).getSchedulingInfo();
+    if (allQueues.get(queueName) != null) {
+      return allQueues.get(queueName).getSchedulingInfo();
+    }
     return null;
   }
-  
+
+  static final String MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY =
+      "Unable to refresh queues because queue-hierarchy changed. "
+          + "Retaining existing configuration. ";
+
+  static final String MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE =
+      "Scheduler couldn't refresh it's queues with the new"
+          + " configuration properties. "
+          + "Retaining existing configuration throughout the system.";
+
   /**
-   * Refresh the acls and state for the configured queues in the system 
-   * by reading it from mapred-queues.xml.
-   * 
-   * Previously configured queues and if or not acls are disabled is retained.
+   * Refresh acls, state and scheduler properties for the configured queues.
+   * <p/>
+   * This method reloads configuration related to queues, but does not
+   * support changes to the list of queues or hierarchy. The expected usage
+   * is that an administrator can modify the queue configuration file and
+   * fire an admin command to reload queue configuration. If there is a
+   * problem in reloading configuration, then this method guarantees that
+   * existing queue configuration is untouched and in a consistent state.
    * 
+   * @param schedulerRefresher
    * @throws IOException when queue configuration file is invalid.
    */
-  synchronized void refreshQueues(Configuration conf) throws IOException {
-
-    // First check if things are configured in mapred-site.xml,
-    // so we can print out a deprecation warning. 
-    // This check is needed only until we support the configuration
-    // in mapred-site.xml
-    checkDeprecation(conf);
-    
-    // Add the queue configuration file. Values from mapred-site.xml
-    // will be overridden.
-    conf.addResource(QUEUE_CONF_FILE_NAME);
-    
-    // Now we refresh the properties of the queues. Note that we
-    // do *not* refresh the queue names or the acls flag. Instead
-    // we use the older values configured for them.
-    LOG.info("Refreshing acls and state for configured queues.");
-    try {
-      Iterator<String> itr = queueNames.iterator();
-      while(itr.hasNext()) {
-        String name = itr.next();
-        Queue q = queues.get(name);
-        Map<String, AccessControlList> newAcls = getQueueAcls(name, conf);
-        QueueState newState = getQueueState(name, conf);
-        q.setAcls(newAcls);
-        q.setState(newState);
-      }
-    } catch (Throwable t) {
-      String exceptionString = StringUtils.stringifyException(t);
-      LOG.warn("Queues could not be refreshed because there was an " +
-      		"exception in parsing the configuration: "+ exceptionString +
-      		". Existing ACLs/state is retained.");
-      throw new IOException(exceptionString);
-    }
-  }
-  
-  // Check if queue properties are configured in the passed in
-  // configuration. If yes, print out deprecation warning messages.
-  private void checkDeprecation(Configuration conf) {
-
-    // check if queues are defined.
-    String[] queues = null;
-    String queueNameValues = conf.get("mapred.queue.names");
-    if (queueNameValues != null) {
-      LOG.warn("Configuring \"mapred.queue.names\" in mapred-site.xml or " +
-          		"hadoop-site.xml is deprecated. Configure " +
-              "\"mapred.queue.names\" in " +
-          		QUEUE_CONF_FILE_NAME);
-      // store queues so we can check if ACLs are also configured
-      // in the deprecated files.
-      queues = conf.getStrings("mapred.queue.names");
-    }
-    
-    // check if the acls flag is defined
-    String aclsEnable = conf.get("mapred.acls.enabled");
-    if (aclsEnable != null) {
-      LOG.warn("Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
-          		"hadoop-site.xml is deprecated. Configure " +
-              "\"mapred.acls.enabled\" in " +
-          		QUEUE_CONF_FILE_NAME);
-    }
-    
-    // check if acls are defined
-    if (queues != null) {
-      for (String queue : queues) {
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = toFullPropertyName(queue, oper.getAclName());
-          String aclString = conf.get(key);
-          if (aclString != null) {
-            LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
-          	  "hadoop-site.xml is deprecated. Configure queue ACLs in " + 
-          		QUEUE_CONF_FILE_NAME);
-            // even if one string is configured, it is enough for printing
-            // the warning. so we can return from here.
-            return;
-          }
-        }
+  synchronized void refreshQueues(Configuration conf,
+      QueueRefresher schedulerRefresher)
+      throws IOException {
+
+    // Create a new configuration parser using the passed conf object.
+    QueueConfigurationParser cp =
+        QueueManager.getQueueConfigurationParser(conf, true);
+
+    /*
+     * (1) Validate the refresh of properties owned by QueueManager. As of now,
+     * while refreshing queue properties, we only check that the hierarchy is
+     * the same w.r.t queue names, ACLs and state for each queue and don't
+     * support adding new queues or removing old queues
+     */
+    if (!root.isHierarchySameAs(cp.getRoot())) {
+      LOG.warn(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
+      throw new IOException(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
+    }
+
+    /*
+     * (2) QueueManager owned properties are validated. Now validate and
+     * refresh the properties of scheduler in a single step.
+     */
+    if (schedulerRefresher != null) {
+      try {
+        schedulerRefresher.refreshQueues(cp.getRoot().getJobQueueInfo().getChildren());
+      } catch (Throwable e) {
+        StringBuilder msg =
+            new StringBuilder(
+                "Scheduler's refresh-queues failed with the exception : "
+                    + StringUtils.stringifyException(e));
+        msg.append("\n");
+        msg.append(MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE);
+        LOG.error(msg.toString());
+        throw new IOException(msg.toString());
       }
     }
-  }
-  
-  // Parse ACLs for the queue from the configuration.
-  private Map<String, AccessControlList> getQueueAcls(String name,
-                                                        Configuration conf) {
-    HashMap<String, AccessControlList> map = 
-        new HashMap<String, AccessControlList>();
-    for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-      String aclKey = toFullPropertyName(name, oper.getAclName());
-      map.put(aclKey, new AccessControlList(conf.get(aclKey, "*")));
-    }
-    return map;
-  }
-
-  // Parse ACLs for the queue from the configuration.
-  private QueueState getQueueState(String name, Configuration conf) {
-    QueueState retState = QueueState.RUNNING;
-    String stateVal = conf.get(toFullPropertyName(name,
-                                                  QueueManager.QUEUE_STATE_SUFFIX),
-                               QueueState.RUNNING.getStateName());
-    for (QueueState state : QueueState.values()) {
-      if (state.getStateName().equalsIgnoreCase(stateVal)) {
-        retState = state;
-        break;
-      }
-    }
-    return retState;
-  }
- 
-  public static final String toFullPropertyName(String queue,
-      String property) {
+
+    /*
+     * (3) Scheduler has validated and refreshed its queues successfully, now
+     * refresh the properties owned by QueueManager
+     */
+
+    // First copy the scheduling information recursively into the new
+    // queue-hierarchy. This is done to retain old scheduling information. This
+    // is done after scheduler refresh and not before it because during refresh,
+    // schedulers may wish to change their scheduling info objects too.
+    cp.getRoot().copySchedulingInfo(this.root);
+
+    // Now switch roots.
+    initialize(cp);
+
+    LOG.info("Queue configuration is refreshed successfully.");
+  }
+
+  static final String toFullPropertyName(
+    String queue,
+    String property) {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
 
+  /**
+   * Return an array of {@link JobQueueInfo} objects for all the
+   * queues configurated in the system.
+   *
+   * @return array of JobQueueInfo objects.
+   */
   synchronized JobQueueInfo[] getJobQueueInfos() {
     ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
-    for (String queue : queueNames) {
+    for (String queue : allQueues.keySet()) {
       JobQueueInfo queueInfo = getJobQueueInfo(queue);
       if (queueInfo != null) {
-        queueInfoList.add(getJobQueueInfo(queue));  
+        queueInfoList.add(queueInfo);
       }
     }
-    return  queueInfoList.toArray(
-            new JobQueueInfo[queueInfoList.size()]);
+    return queueInfoList.toArray(
+      new JobQueueInfo[queueInfoList.size()]);
   }
 
+
+  /**
+   * Return {@link JobQueueInfo} for a given queue.
+   *
+   * @param queue name of the queue
+   * @return JobQueueInfo for the queue, null if the queue is not found.
+   */
   synchronized JobQueueInfo getJobQueueInfo(String queue) {
-    if (queues.get(queue) != null) {
-      Object schedulingInfo = queues.get(queue).getSchedulingInfo();
-      JobQueueInfo qInfo;
-      if (schedulingInfo != null) {
-        qInfo = new JobQueueInfo(queue, schedulingInfo.toString());
-      } else {
-        qInfo = new JobQueueInfo(queue, null);
-      }
-      qInfo.setQueueState(queues.get(queue).getState().getStateName());
-      return qInfo;
+    if (allQueues.containsKey(queue)) {
+      return allQueues.get(queue).getJobQueueInfo();
     }
+
     return null;
   }
 
   /**
-   * Generates the array of QueueAclsInfo object. 
-   * 
+   * JobQueueInfo for all the queues.
+   * <p/>
+   * Contribs can use this data structure to either create a hierarchy or for
+   * traversing.
+   * They can also use this to refresh properties in case of refreshQueues
+   *
+   * @return a map for easy navigation.
+   */
+  synchronized Map<String, JobQueueInfo> getJobQueueInfoMapping() {
+    Map<String, JobQueueInfo> m = new HashMap<String, JobQueueInfo>();
+
+    for (String key : allQueues.keySet()) {
+      m.put(key, allQueues.get(key).getJobQueueInfo());
+    }
+
+    return m;
+  }
+
+  /**
+   * Generates the array of QueueAclsInfo object.
+   * <p/>
    * The array consists of only those queues for which user has acls.
    *
    * @return QueueAclsInfo[]
    * @throws java.io.IOException
    */
   synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
-                                            throws IOException {
+    throws IOException {
     //List of all QueueAclsInfo objects , this list is returned
     ArrayList<QueueAclsInfo> queueAclsInfolist =
-            new ArrayList<QueueAclsInfo>();
+      new ArrayList<QueueAclsInfo>();
     Queue.QueueOperation[] operations = Queue.QueueOperation.values();
-    for (String queueName : queueNames) {
+    for (String queueName : leafQueues.keySet()) {
       QueueAclsInfo queueAclsInfo = null;
       ArrayList<String> operationsAllowed = null;
       for (Queue.QueueOperation operation : operations) {
@@ -427,33 +506,229 @@
       if (operationsAllowed != null) {
         //There is atleast 1 operation supported for queue <queueName>
         //, hence initialize queueAclsInfo
-        queueAclsInfo = new QueueAclsInfo(queueName, operationsAllowed.toArray
-                (new String[operationsAllowed.size()]));
+        queueAclsInfo = new QueueAclsInfo(
+          queueName, operationsAllowed.toArray
+            (new String[operationsAllowed.size()]));
         queueAclsInfolist.add(queueAclsInfo);
       }
     }
-    return queueAclsInfolist.toArray(new QueueAclsInfo[
-            queueAclsInfolist.size()]);
+    return queueAclsInfolist.toArray(
+      new QueueAclsInfo[
+        queueAclsInfolist.size()]);
   }
 
-  // ONLY FOR TESTING - Do not use in production code.
+  /**
+   * ONLY FOR TESTING - Do not use in production code.
+   * This method is used for setting up of leafQueues only.
+   * We are not setting the hierarchy here.
+   *
+   * @param queues
+   */
   synchronized void setQueues(Queue[] queues) {
+    root.getChildren().clear();
+    leafQueues.clear();
+    allQueues.clear();
+
     for (Queue queue : queues) {
-      this.queues.put(queue.getName(), queue);
+      root.addChild(queue);
+    }
+    //At this point we have root populated
+    //update data structures leafNodes.
+    leafQueues = getRoot().getLeafQueues();
+    allQueues.putAll(getRoot().getInnerQueues());
+    allQueues.putAll(leafQueues);
+  }
+
+  /**
+   * Return an array of {@link JobQueueInfo} objects for the root
+   * queues configured in the system.
+   * <p/>
+   * Root queues are queues that are at the top-most level in the
+   * hierarchy of queues in mapred-queues.xml, or they are the queues
+   * configured in the mapred.queue.names key in mapred-site.xml.
+   *
+   * @return array of JobQueueInfo objects for root level queues.
+   */
+
+  JobQueueInfo[] getRootQueues() {
+    List<JobQueueInfo> list = getRoot().getJobQueueInfo().getChildren();
+    return list.toArray(new JobQueueInfo[list.size()]);
+  }
+
+  /**
+   * Get the complete hierarchy of children for queue
+   * queueName
+   *
+   * @param queueName
+   * @return
+   */
+  JobQueueInfo[] getChildQueues(String queueName) {
+    List<JobQueueInfo> list =
+      allQueues.get(queueName).getJobQueueInfo().getChildren();
+    if (list != null) {
+      return list.toArray(new JobQueueInfo[list.size()]);
+    } else {
+      return new JobQueueInfo[0];
     }
   }
 
   /**
-   * prints the configuration of QueueManager in Json format.
-   * The method should be modified accordingly whenever
-   * QueueManager(Configuration) constructor is modified.
-   * @param writer {@link}Writer object to which the configuration properties 
-   * are printed in json format
+   * Used only for testing purposes .
+   * This method is unstable as refreshQueues would leave this
+   * data structure in unstable state.
+   *
+   * @param queueName
+   * @return
+   */
+  Queue getQueue(String queueName) {
+    return this.allQueues.get(queueName);
+  }
+
+
+  /**
+   * Return if ACLs are enabled for the Map/Reduce system
+   *
+   * @return true if ACLs are enabled.
+   */
+  boolean isAclsEnabled() {
+    return isAclEnabled;
+  }
+
+  /**
+   * Used only for test.
+   *
+   * @return
+   */
+  Queue getRoot() {
+    return root;
+  }
+  
+  /**
+   * Dumps the configuration of hierarchy of queues
+   * @param out the writer object to which dump is written
+   * @throws IOException
+   */
+  static void dumpConfiguration(Writer out,Configuration conf) throws IOException {
+    dumpConfiguration(out, null,conf);
+  }
+  
+  /***
+   * Dumps the configuration of hierarchy of queues with 
+   * the xml file path given. It is to be used directly ONLY FOR TESTING.
+   * @param out the writer object to which dump is written to.
+   * @param configFile the filename of xml file
+   * @throws IOException
+   */
+  static void dumpConfiguration(Writer out, String configFile,
+      Configuration conf) throws IOException {
+    if (conf != null && conf.get(DeprecatedQueueConfigurationParser.
+        MAPRED_QUEUE_NAMES_KEY) != null) {
+      return;
+    }
+    JsonFactory dumpFactory = new JsonFactory();
+    JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
+    QueueConfigurationParser parser;
+    if (configFile != null && !"".equals(configFile)) {
+      parser = new QueueConfigurationParser(configFile);
+    }
+    else {
+      parser = QueueManager.getQueueConfigurationParser(null, false);
+    }
+    dumpGenerator.writeStartObject();
+    dumpGenerator.writeBooleanField("acls_enabled", parser.isAclsEnabled());
+    dumpGenerator.writeFieldName("queues");
+    dumpGenerator.writeStartArray();
+    dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
+    dumpGenerator.writeEndArray();
+    dumpGenerator.writeEndObject();
+    dumpGenerator.flush();
+  }
+
+  /**
+   * method to perform depth-first search and write the parameters of every 
+   * queue in JSON format.
+   * @param dumpGenerator JsonGenerator object which takes the dump and flushes
+   *  to a writer object
+   * @param rootQueues the top-level queues
+   * @throws JsonGenerationException
    * @throws IOException
    */
-  static void dumpConfiguration(Writer writer) throws IOException {
-    Configuration conf = new Configuration(false);
-    conf.addResource(QUEUE_CONF_FILE_NAME);
-    Configuration.dumpConfiguration(conf, writer);
+  private static void dumpConfiguration(JsonGenerator dumpGenerator,
+      Set<Queue> rootQueues) throws JsonGenerationException, IOException {
+    for (Queue queue : rootQueues) {
+      dumpGenerator.writeStartObject();
+      dumpGenerator.writeStringField("name", queue.getName());
+      dumpGenerator.writeStringField("state", queue.getState().toString());
+      AccessControlList submitJobList = null;
+      AccessControlList administerJobsList = null;
+      if (queue.getAcls() != null) {
+        submitJobList =
+          queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
+              Queue.QueueOperation.SUBMIT_JOB.getAclName()));
+        administerJobsList =
+          queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
+              Queue.QueueOperation.ADMINISTER_JOBS.getAclName()));
+      }
+      StringBuilder aclsSubmitJobValue = new StringBuilder();
+      if (submitJobList != null ) {
+        aclsSubmitJobValue = getAclsInfo(submitJobList);
+      }
+      dumpGenerator.writeStringField("acl_submit_job",
+          aclsSubmitJobValue.toString());
+      StringBuilder aclsAdministerValue = new StringBuilder();
+      if (administerJobsList != null) {
+        aclsAdministerValue = getAclsInfo(administerJobsList);
+      }
+      dumpGenerator.writeStringField("acl_administer_jobs",
+          aclsAdministerValue.toString());
+      dumpGenerator.writeFieldName("properties");
+      dumpGenerator.writeStartArray();
+      if (queue.getProperties() != null) {
+        for (Map.Entry<Object, Object>property :
+          queue.getProperties().entrySet()) {
+          dumpGenerator.writeStartObject();
+          dumpGenerator.writeStringField("key", (String)property.getKey());
+          dumpGenerator.writeStringField("value", (String)property.getValue());
+          dumpGenerator.writeEndObject();
+        }
+      }
+      dumpGenerator.writeEndArray();
+      Set<Queue> childQueues = queue.getChildren();
+      dumpGenerator.writeFieldName("children");
+      dumpGenerator.writeStartArray();
+      if (childQueues != null && childQueues.size() > 0) {
+        dumpConfiguration(dumpGenerator, childQueues);
+      }
+      dumpGenerator.writeEndArray();
+      dumpGenerator.writeEndObject();
+    }
+  }
+
+  private static StringBuilder getAclsInfo(AccessControlList accessControlList) {
+    StringBuilder sb = new StringBuilder();
+    if (accessControlList.getUsers() != null &&
+        accessControlList.getUsers().size() > 0) {
+      Set<String> users = accessControlList.getUsers();
+      Iterator<String> iterator = users.iterator();
+      while (iterator.hasNext()) {
+        sb.append(iterator.next());
+        if (iterator.hasNext()) {
+          sb.append(",");
+        }
+      }
+    }
+    if (accessControlList.getGroups() != null &&
+        accessControlList.getGroups().size() > 0) {
+      sb.append(" ");
+      Set<String> groups = accessControlList.getGroups();
+      Iterator<String> iterator = groups.iterator();
+      while (iterator.hasNext()) {
+        sb.append(iterator.next());
+        if (iterator.hasNext()) {
+          sb.append(",");
+        }
+      }
+    }
+    return sb;
   }
 }



Mime
View raw message