hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r815605 [2/3] - in /hadoop/mapreduce/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/java/org/apache/hadoo...
Date Wed, 16 Sep 2009 04:49:20 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java?rev=815605&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java Wed Sep 16 04:49:18 2009
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import static org.apache.hadoop.mapred.QueueManager.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+/**
+ * Class to build queue hierarchy using deprecated conf(mapred-site.xml).
+ * Generates a single level of queue hierarchy. 
+ * 
+ */
+class DeprecatedQueueConfigurationParser extends QueueConfigurationParser {
+  private static final Log LOG =
+    LogFactory.getLog(DeprecatedQueueConfigurationParser.class);
+  static final String MAPRED_QUEUE_NAMES_KEY = "mapred.queue.names";
+
+  DeprecatedQueueConfigurationParser(Configuration conf) {
+    //If not configuration done return immediately.
+    if(!deprecatedConf(conf)) {
+      return;
+    }
+    List<Queue> listq = createQueues(conf);
+    this.setAclsEnabled(conf.getBoolean("mapred.acls.enabled", false));
+    root = new Queue();
+    root.setName("");
+    for (Queue q : listq) {
+      root.addChild(q);
+    }
+  }
+
+  private List<Queue> createQueues(Configuration conf) {
+    String[] queueNameValues = conf.getStrings(
+      MAPRED_QUEUE_NAMES_KEY);
+    List<Queue> list = new ArrayList<Queue>();
+    for (String name : queueNameValues) {
+      try {
+        Map<String, SecurityUtil.AccessControlList> acls = getQueueAcls(
+          name, conf);
+        Queue.QueueState state = getQueueState(name, conf);
+        Queue q = new Queue(name, acls, state);
+        list.add(q);
+      } catch (Throwable t) {
+        LOG.warn("Not able to initialize queue " + name);
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Only applicable to leaf level queues
+   * Parse ACLs for the queue from the configuration.
+   */
+  private Queue.QueueState getQueueState(String name, Configuration conf) {
+    String stateVal = conf.get(
+      QueueManager.toFullPropertyName(
+        name,"state"),
+      Queue.QueueState.RUNNING.getStateName());
+    return QueueConfigurationParser.getQueueState(stateVal);
+  }
+
+  /**
+   * Check if queue properties are configured in the passed in
+   * configuration. If yes, print out deprecation warning messages.
+   */
+  private boolean deprecatedConf(Configuration conf) {
+    String[] queues = null;
+    String queueNameValues = getQueueNames(conf);
+    if (queueNameValues == null) {
+      return false;
+    } else {
+      LOG.warn(
+        "Configuring \"mapred.queue.names\" in mapred-site.xml or " +
+          "hadoop-site.xml is deprecated. Configure " +
+          "queue hierarchy  in " +
+          QUEUE_CONF_FILE_NAME);
+      // store queues so we can check if ACLs are also configured
+      // in the deprecated files.
+      queues = conf.getStrings(MAPRED_QUEUE_NAMES_KEY);
+    }
+
+    // check if the acls flag is defined
+    String aclsEnable = conf.get("mapred.acls.enabled");
+    if (aclsEnable != null) {
+      LOG.warn(
+        "Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
+          "hadoop-site.xml is deprecated. Configure " +
+          "queue hierarchy in " +
+          QUEUE_CONF_FILE_NAME);
+    }
+
+    // check if acls are defined
+    if (queues != null) {
+      for (String queue : queues) {
+        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
+          String key = toFullPropertyName(queue, oper.getAclName());
+          String aclString = conf.get(key);
+          if (aclString != null) {
+            LOG.warn(
+              "Configuring queue ACLs in mapred-site.xml or " +
+                "hadoop-site.xml is deprecated. Configure queue ACLs in " +
+                QUEUE_CONF_FILE_NAME);
+            // even if one string is configured, it is enough for printing
+            // the warning. so we can return from here.
+            return true;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  private String getQueueNames(Configuration conf) {
+    String queueNameValues = conf.get(MAPRED_QUEUE_NAMES_KEY);
+    return queueNameValues;
+  }
+
+  /**
+   * Parse ACLs for the queue from the configuration.
+   */
+  private Map<String, SecurityUtil.AccessControlList> getQueueAcls(
+    String name,
+    Configuration conf) {
+    HashMap<String, SecurityUtil.AccessControlList> map =
+      new HashMap<String, SecurityUtil.AccessControlList>();
+    for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
+      String aclKey = toFullPropertyName(name, oper.getAclName());
+      map.put(
+        aclKey, new SecurityUtil.AccessControlList(
+          conf.get(
+            aclKey, "*")));
+    }
+    return map;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Sep 16 04:49:18 2009
@@ -1953,7 +1953,29 @@
     }
     return sysDir;
   }
-  
+
+  /**
+   * Returns an array of queue information objects about root level queues
+   * configured
+   *
+   * @return the array of root level JobQueueInfo objects
+   * @throws IOException
+   */
+  public JobQueueInfo[] getRootQueues() throws IOException {
+    return jobSubmitClient.getRootQueues();
+  }
+
+  /**
+   * Returns an array of queue information objects about immediate children
+   * of queue queueName.
+   * 
+   * @param queueName
+   * @return the array of immediate children JobQueueInfo objects
+   * @throws IOException
+   */
+  public JobQueueInfo[] getChildQueues(String queueName) throws IOException {
+    return jobSubmitClient.getChildQueues(queueName);
+  }
   
   /**
    * Return an array of queue information objects about all the Job Queues

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueInfo.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueInfo.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueInfo.java Wed Sep 16 04:49:18 2009
@@ -20,6 +20,9 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -38,13 +41,17 @@
   private String schedulingInfo; 
   
   private String queueState;
-  
+
+  private List<JobQueueInfo> children;
+
+  private Properties props;
+
   /**
    * Default constructor for Job Queue Info.
    * 
    */
   public JobQueueInfo() {
-    
+    children = new ArrayList<JobQueueInfo>();
   }
   /**
    * Construct a new JobQueueInfo object using the queue name and the
@@ -59,6 +66,7 @@
     this.schedulingInfo = schedulingInfo;
     // make it running by default.
     this.queueState = Queue.QueueState.RUNNING.getStateName();
+    children = new ArrayList<JobQueueInfo>();
   }
   
   
@@ -118,22 +126,49 @@
   public String getQueueState() {
     return queueState;
   }
-  
+
+  public List<JobQueueInfo> getChildren() {
+    return children;
+  }
+
+  public void setChildren(List<JobQueueInfo> children) {
+    this.children =  children; 
+  }
+
+  Properties getProperties() {
+    return props;
+  }
+
+  void setProperties(Properties props) {
+    this.props = props;
+  }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     queueName = Text.readString(in);
     queueState = Text.readString(in);
     schedulingInfo = Text.readString(in);
+    int count = in.readInt();
+    children.clear();
+    for (int i = 0; i < count; i++) {
+      JobQueueInfo childQueueInfo = new JobQueueInfo();
+      childQueueInfo.readFields(in);
+      children.add(childQueueInfo);
+    }
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, queueName);
     Text.writeString(out, queueState);
-    if(schedulingInfo!= null) {
+    if (schedulingInfo != null) {
       Text.writeString(out, schedulingInfo);
-    }else {
+    } else {
       Text.writeString(out, "N/A");
     }
+    out.writeInt(children.size());
+    for(JobQueueInfo childQueueInfo : children) {
+      childQueueInfo.write(out);
+    }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Wed Sep 16 04:49:18 2009
@@ -66,8 +66,12 @@
    * Version 24: Modified ClusterStatus to include BlackListInfo class which 
    *             encapsulates reasons and report for blacklisted node.          
    * Version 25: Added fields to JobStatus for HADOOP-817.
+   *
+   * Version 26: Added properties to JobQueueInfo as part of MAPREDUCE-861.
+   *             added new api's getRootQueues and
+   *             getChildQueues(String queueName)
    */
-  public static final long versionID = 25L;
+  public static final long versionID = 26L;
 
   /**
    * Allocate a name for the job.
@@ -227,4 +231,20 @@
    * @throws IOException
    */
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException;
+
+  /**
+   * Gets the root level queues.
+   * @return array of JobQueueInfo object.
+   * @throws IOException
+   */
+  public JobQueueInfo[] getRootQueues() throws IOException;
+  
+
+  /**
+   * Returns immediate children of queueName.
+   * @param queueName
+   * @return array of JobQueueInfo which are children of queueName
+   * @throws IOException
+   */
+  public JobQueueInfo[] getChildQueues(String queueName) throws IOException;
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Sep 16 04:49:18 2009
@@ -2753,7 +2753,7 @@
     JobInProgress job = new JobInProgress(jobId, this, this.conf, restartCount);
     
     String queue = job.getProfile().getQueueName();
-    if(!(queueManager.getQueues().contains(queue))) {      
+    if(!(queueManager.getLeafQueueNames().contains(queue))) {
       new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
@@ -3589,9 +3589,29 @@
   private static void dumpConfiguration(Writer writer) throws IOException {
     Configuration.dumpConfiguration(new JobConf(), writer);
     writer.write("\n");
-    // get the QueueManager configuration properties
-    QueueManager.dumpConfiguration(writer);
-    writer.write("\n");
+  }
+
+  /**
+   * Gets the root level queues.
+   *
+   * @return array of JobQueueInfo object.
+   * @throws java.io.IOException
+   */
+   @Override
+  public JobQueueInfo[] getRootQueues() throws IOException {
+    return queueManager.getRootQueues();
+  }
+ 
+  /**
+   * Returns immediate children of queueName.
+   *
+   * @param queueName
+   * @return array of JobQueueInfo which are children of queueName
+   * @throws java.io.IOException
+   */
+  @Override
+  public JobQueueInfo[] getChildQueues(String queueName) throws IOException {
+     return queueManager.getChildQueues(queueName);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Sep 16 04:49:18 2009
@@ -531,6 +531,17 @@
     return fs.makeQualified(sysDir).toString();
   }
 
+
+  @Override
+  public JobQueueInfo[] getChildQueues(String queueName) throws IOException {
+    return null;
+  }
+
+  @Override
+  public JobQueueInfo[] getRootQueues() throws IOException {
+    return null;
+  }
+
   @Override
   public JobStatus[] getJobsFromQueue(String queue) throws IOException {
     return null;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java Wed Sep 16 04:49:18 2009
@@ -17,32 +17,34 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * A class for storing the properties of a job queue.
  */
-class Queue {
+class Queue implements Comparable<Queue>{
 
   private static final Log LOG = LogFactory.getLog(Queue.class);
 
   //Queue name
-  private String name;
+  private String name = null;
 
   //acls list
   private Map<String, AccessControlList> acls;
 
   //Queue State
-  private QueueState state;
+  private QueueState state = QueueState.RUNNING;
 
   // An Object that can be used by schedulers to fill in
   // arbitrary scheduling information. The toString method
@@ -50,6 +52,10 @@
   // get a String that can be displayed on UI.
   private Object schedulingInfo;
 
+  private Set<Queue> children;
+
+  private Properties props;
+
   /**
    * Enum representing queue state
    */
@@ -69,6 +75,11 @@
       return stateName;
     }
 
+    @Override
+    public String toString() {
+      return stateName;
+    }
+
   }
 
   /**
@@ -100,6 +111,14 @@
   }
 
   /**
+   * Default constructor is useful in creating the hierarchy.
+   * The variables are populated using mutator methods.
+   */
+  Queue() {
+    
+  }
+
+  /**
    * Create a job queue
    * @param name name of the queue
    * @param acls ACLs for the queue
@@ -182,4 +201,202 @@
   void setSchedulingInfo(Object schedulingInfo) {
     this.schedulingInfo = schedulingInfo;
   }
+
+  /**
+   *
+   */
+  void addChild(Queue child) {
+    if(children == null) {
+      children = new TreeSet<Queue>();
+    }
+
+    children.add(child);
+  }
+
+  /**
+   *
+   * @return
+   */
+  Set<Queue> getChildren() {
+    return children;
+  }
+
+  /**
+   * 
+   * @param props
+   */
+  void setProperties(Properties props) {
+     this.props = props;
+  }
+
+  /**
+   *
+   * @return
+   */
+  Properties getProperties() {
+    return this.props;
+  }
+
+  /**
+   * This methods helps in traversing the
+   * tree hierarchy.
+   *
+   * Returns list of all inner queues.i.e nodes which has children.
+   * below this level.
+   *
+   * Incase of children being null , returns an empty map.
+   * This helps in case of creating union of inner and leaf queues.
+   * @return
+   */
+  Map<String,Queue> getInnerQueues() {
+    Map<String,Queue> l = new HashMap<String,Queue>();
+
+    //If no children , return empty set.
+    //This check is required for root node.
+    if(children == null) {
+      return l;
+    }
+
+    //check for children if they are parent.
+    for(Queue child:children) {
+      //check if children are themselves parent add them
+      if(child.getChildren() != null && child.getChildren().size() > 0) {
+        l.put(child.getName(),child);
+        l.putAll(child.getInnerQueues());
+      }
+    }
+    return l;
+  }
+
+  /**
+   * This method helps in maintaining the single
+   * data structure across QueueManager.
+   *
+   * Now if we just maintain list of root queues we
+   * should be done.
+   *
+   * Doesn't return null .
+   * Adds itself if this is leaf node.
+   *  
+   * @return
+   */
+  Map<String,Queue> getLeafQueues() {
+    Map<String,Queue> l = new HashMap<String,Queue>();
+    if(children == null) {
+      l.put(name,this);
+      return l;
+    }
+
+    for(Queue child:children) {
+      l.putAll(child.getLeafQueues());
+    }
+    return l;
+  }
+
+
+  @Override
+  public int compareTo(Queue queue) {
+    return name.compareTo(queue.getName());
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if(o == this) {
+      return true;
+    }
+    if(! (o instanceof Queue)) {
+      return false;
+    }
+    
+    return ((Queue)o).getName().equals(name);
+  }
+
+  @Override
+  public String toString() {
+    return this.getName();
+  }
+
+  @Override
+  public int hashCode() {
+    return this.getName().hashCode();
+  }
+
+  /**
+   * Return hierarchy of {@link JobQueueInfo} objects
+   * under this Queue.
+   *
+   * @return JobQueueInfo[]
+   */
+  JobQueueInfo getJobQueueInfo() {
+    JobQueueInfo queueInfo = new JobQueueInfo();
+    queueInfo.setQueueName(name);
+    queueInfo.setQueueState(state.getStateName());
+    if (schedulingInfo != null) {
+      queueInfo.setSchedulingInfo(schedulingInfo.toString());
+    }
+
+    if (props != null) {
+      //Create deep copy of properties.
+      Properties newProps = new Properties();
+      for (Object key : props.keySet()) {
+        newProps.setProperty(key.toString(), props.getProperty(key.toString()));
+      }
+      queueInfo.setProperties(newProps);
+    }
+
+    if (children != null && children.size() > 0) {
+      List<JobQueueInfo> list = new ArrayList<JobQueueInfo>();
+      for (Queue child : children) {
+        list.add(child.getJobQueueInfo());
+      }
+      queueInfo.setChildren(list);
+    }
+    return queueInfo;
+  }
+
+  /**
+   * For each node validate if current node hierarchy is same newState.
+   * recursively check for child nodes.
+   * 
+   * @param newState
+   * @return
+   */
+  boolean isHierarchySameAs(Queue newState) {
+    if(newState == null) {
+      return false;
+    }
+    //First check if names are equal
+    if(!(name.equals(newState.getName())) ) {
+      LOG.info(" current name " + name + " not equal to " + newState.getName());
+      return false;
+    }
+
+    if (children == null || children.size() == 0) {
+      if(newState.getChildren() != null && newState.getChildren().size() > 0) {
+        LOG.info( newState + " has added children in refresh ");
+        return false;
+      }
+    } else if(children.size() > 0) {
+
+      //check for the individual children and then see if all of them
+      //are updated.
+      if(children.size() != newState.getChildren().size()) {
+        return false;
+      }
+      //children are pre sorted as they are stored in treeset.
+      //hence order shold be the same.
+      Iterator<Queue> itr1 = children.iterator();
+      Iterator<Queue> itr2 = newState.getChildren().iterator();
+
+      while(itr1.hasNext()) {
+        Queue q = itr1.next();
+        Queue newq = itr2.next();
+        if(! (q.isHierarchySameAs(newq)) ) {
+          LOG.info(" Queue " + q.getName() + " not equal to " + newq.getName());
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=815605&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Wed Sep 16 04:49:18 2009
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
+import org.xml.sax.SAXException;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.DOMException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.DocumentBuilder;
+import java.io.IOException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * Class for parsing mapred-queues.xml.
+ *    The format consists nesting of
+ *    queues within queues - a feature called hierarchical queues.
+ *    The parser expects that queues are
+ *    defined within the 'queues' tag which is the top level element for
+ *    XML document.
+ * 
+ * Creates the complete queue hieararchy
+ */
+class QueueConfigurationParser {
+  private static final Log LOG =
+    LogFactory.getLog(QueueConfigurationParser.class);
+  
+  private boolean aclsEnabled = false;
+
+  //Default root.
+  protected Queue root = null;
+
+  //xml tags for mapred-queues.xml
+  static final String NAME_SEPARATOR = ":";
+  static final String QUEUE_TAG = "queue";
+  static final String ACL_SUBMIT_JOB_TAG = "acl-submit-job";
+  static final String ACL_ADMINISTER_JOB_TAG = "acl-administer-jobs";
+  static final String PROPERTIES_TAG = "properties";
+  static final String STATE_TAG = "state";
+  static final String QUEUE_NAME_TAG = "name";
+  static final String QUEUES_TAG = "queues";
+  static final String ACLS_ENABLED_TAG = "aclsEnabled";
+  static final String PROPERTY_TAG = "property";
+  static final String KEY_TAG = "key";
+  static final String VALUE_TAG = "value";
+
+  /**
+   * Default constructor for DeperacatedQueueConfigurationParser
+   */
+  QueueConfigurationParser() {
+    
+  }
+
+  QueueConfigurationParser(String file) {
+    try {
+      this.root = loadResource(file);
+    } catch (ParserConfigurationException e) {
+      throw new RuntimeException(e);
+    } catch (SAXException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  void setAclsEnabled(boolean aclsEnabled) {
+    this.aclsEnabled = aclsEnabled;
+  }
+
+  static Queue.QueueState getQueueState(String stateVal) {
+    Queue.QueueState retState = null;
+    for (Queue.QueueState state : Queue.QueueState.values()) {
+      if (state.getStateName().equalsIgnoreCase(stateVal)) {
+        retState = state;
+        break;
+      }
+    }
+
+    if(retState == null) {
+      LOG.error("Improper state value , " +
+        "setting the state to " + Queue.QueueState.RUNNING.getStateName());
+      retState = Queue.QueueState.RUNNING;
+    }
+    return retState;
+  }
+
+  boolean isAclsEnabled() {
+    return aclsEnabled;
+  }
+
+  Queue getRoot() {
+    return root;
+  }
+
+  void setRoot(Queue root) {
+    this.root = root;
+  }
+
+  /**
+   * Method to load the resource file.
+   * generates the root.
+   * 
+   * @param confFile
+   * @return
+   * @throws ParserConfigurationException
+   * @throws SAXException
+   * @throws IOException
+   */
+  protected Queue loadResource(String confFile)
+    throws ParserConfigurationException, SAXException, IOException {
+    DocumentBuilderFactory docBuilderFactory
+      = DocumentBuilderFactory.newInstance();
+    //ignore all comments inside the xml file
+    docBuilderFactory.setIgnoringComments(true);
+
+    //allow includes in the xml file
+    docBuilderFactory.setNamespaceAware(true);
+    try {
+      docBuilderFactory.setXIncludeAware(true);
+    } catch (UnsupportedOperationException e) {
+      LOG.info(
+        "Failed to set setXIncludeAware(true) for parser "
+          + docBuilderFactory
+          + NAME_SEPARATOR + e);
+    }
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc = null;
+    Element queuesNode = null;
+    File file = new File(confFile).getAbsoluteFile();
+    if (file.exists()) {
+      InputStream in = new BufferedInputStream(new FileInputStream(file));
+      try {
+        doc = builder.parse(in);
+      } finally {
+        in.close();
+      }
+    }
+
+    if (doc == null) {
+      throw new RuntimeException(file.getAbsolutePath() + " not found");
+    }
+    queuesNode = doc.getDocumentElement();
+    return this.parseResource(queuesNode);
+  }
+
+  private Queue parseResource(Element queuesNode) {
+    Queue rootNode = null;
+    try {
+      if (!QUEUES_TAG.equals(queuesNode.getTagName())) {
+        LOG.info("Bad conf file: top-level element not <queues>");
+        throw new RuntimeException("No queues defined ");
+      }
+      NamedNodeMap nmp = queuesNode.getAttributes();
+      Node acls = nmp.getNamedItem(ACLS_ENABLED_TAG);
+
+      if (acls != null && acls.getTextContent().equalsIgnoreCase("true")) {
+        setAclsEnabled(true);
+      }
+
+      NodeList props = queuesNode.getChildNodes();
+      if (props == null || props.getLength() <= 0) {
+        LOG.info(" Bad configuration no queues defined ");
+        throw new RuntimeException(" No queues defined ");
+      }
+
+      //We have root level nodes.
+      for (int i = 0; i < props.getLength(); i++) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element)) {
+          continue;
+        }
+
+        if (!propNode.getNodeName().equals(QUEUE_TAG)) {
+          LOG.info("At root level only \" queue \" tags are allowed ");
+          throw
+            new RuntimeException("Malformed xml document no queue defined ");
+        }
+
+        Element prop = (Element) propNode;
+        //Add children to root.
+        Queue q = createHierarchy("", prop);
+        if(rootNode == null) {
+          rootNode = new Queue();
+          rootNode.setName("");
+        }
+        rootNode.addChild(q);
+      }
+      return rootNode;
+    } catch (DOMException e) {
+      LOG.info("Error parsing conf file: " + e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * @param parent Name of the parent queue
+   * @param queueNode
+   * @return
+   */
+  private Queue createHierarchy(String parent, Element queueNode) {
+
+    if (queueNode == null) {
+      return null;
+    }
+    //Name of the current queue.
+    //Complete qualified queue name.
+    String name = "";
+    Queue newQueue = new Queue();
+    Map<String, AccessControlList> acls =
+      new HashMap<String, AccessControlList>();
+
+    NodeList fields = queueNode.getChildNodes();
+    validate(queueNode);
+    List<Element> subQueues = new ArrayList<Element>();
+
+    for (int j = 0; j < fields.getLength(); j++) {
+      Node fieldNode = fields.item(j);
+      if (!(fieldNode instanceof Element)) {
+        continue;
+      }
+      Element field = (Element) fieldNode;
+      if (QUEUE_NAME_TAG.equals(field.getTagName())) {
+        String nameValue = field.getTextContent();
+        if (field.getTextContent() == null ||
+          field.getTextContent().trim().equals("") ||
+          field.getTextContent().contains(NAME_SEPARATOR)) {
+          throw new RuntimeException("Improper queue name : " + nameValue);
+        }
+
+        if (!parent.equals("")) {
+          name += parent + NAME_SEPARATOR;
+        }
+        //generate the complete qualified name
+        //parent.child
+        name += nameValue;
+        newQueue.setName(name);
+      }
+
+      if (QUEUE_TAG.equals(field.getTagName()) && field.hasChildNodes()) {
+        subQueues.add(field);
+      }
+      if(isAclsEnabled()) {
+        if (ACL_SUBMIT_JOB_TAG.equals(field.getTagName())) {
+          String submitList = field.getTextContent();
+          String aclKey = toFullPropertyName(
+            name, Queue.QueueOperation.SUBMIT_JOB.getAclName());
+          acls.put(aclKey, new AccessControlList(submitList));
+        }
+
+        if (ACL_ADMINISTER_JOB_TAG.equals(field.getTagName())) {
+          String administerList = field.getTextContent();
+          String aclKey = toFullPropertyName(
+            name, Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
+          acls.put(aclKey, new AccessControlList(administerList));
+        }
+      }
+
+      if (PROPERTIES_TAG.equals(field.getTagName())) {
+        Properties properties = populateProperties(field);
+        newQueue.setProperties(properties);
+      }
+
+      if (STATE_TAG.equals(field.getTagName())) {
+        String state = field.getTextContent();
+        newQueue.setState(getQueueState(state));
+      }
+    }
+    //Set acls
+    newQueue.setAcls(acls);
+    //At this point we have the queue ready at current height level.
+    //so we have parent name available.
+
+    for(Element field:subQueues) {
+      newQueue.addChild(createHierarchy(newQueue.getName(), field));
+    }
+    return newQueue;
+  }
+
+  /**
+   * Populate the properties for Queue
+   *
+   * @param field
+   * @return
+   */
+  private Properties populateProperties(Element field) {
+    Properties props = new Properties();
+
+    NodeList propfields = field.getChildNodes();
+
+    for (int i = 0; i < propfields.getLength(); i++) {
+      Node prop = propfields.item(i);
+
+      //If this node is not of type element
+      //skip this.
+      if (!(prop instanceof Element)) {
+        continue;
+      }
+
+      if (PROPERTY_TAG.equals(prop.getNodeName())) {
+        if (prop.hasAttributes()) {
+          NamedNodeMap nmp = prop.getAttributes();
+          if (nmp.getNamedItem(KEY_TAG) != null && nmp.getNamedItem(
+            VALUE_TAG) != null) {
+            props.setProperty(
+              nmp.getNamedItem(KEY_TAG).getTextContent(), nmp.getNamedItem(
+                VALUE_TAG).getTextContent());
+          }
+        }
+      }
+    }
+    return props;
+  }
+
+  /**
+   *
+   * Checks if there is NAME_TAG for queues.
+   *
+   * Checks if (queue has children)
+   *  then it shouldnot have acls-* or state
+   *   else
+   *  throws an Exception.
+   * @param node
+   */
+  private void validate(Node node) {
+
+    NodeList fields = node.getChildNodes();
+
+    //Check if <queue> & (<acls-*> || <state>) are not siblings
+    //if yes throw an IOException.
+    Set<String> siblings = new HashSet<String>();
+    for (int i = 0; i < fields.getLength(); i++) {
+      if (!(fields.item(i) instanceof Element)) {
+        continue;
+      }
+      siblings.add((fields.item(i)).getNodeName());
+    }
+
+    if(! siblings.contains(QUEUE_NAME_TAG)) {
+      throw new RuntimeException(
+        " Malformed xml formation queue name not specified ");
+    }
+
+    if (siblings.contains(QUEUE_TAG) && (
+      siblings.contains(ACL_ADMINISTER_JOB_TAG) ||
+        siblings.contains(ACL_SUBMIT_JOB_TAG) ||
+        siblings.contains(STATE_TAG)
+    )) {
+      throw new RuntimeException(
+        " Malformed xml formation queue tag and acls " +
+          "tags or state tags are siblings ");
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java Wed Sep 16 04:49:18 2009
@@ -18,171 +18,247 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Queue.QueueState;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.security.UserGroupInformation;
+
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.Writer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.List;
+import java.net.URL;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Queue.QueueState;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.SecurityUtil.AccessControlList;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * Class that exposes information about queues maintained by the Hadoop
  * Map/Reduce framework.
- * 
+ * <p/>
  * The Map/Reduce framework can be configured with one or more queues,
- * depending on the scheduler it is configured with. While some 
- * schedulers work only with one queue, some schedulers support multiple 
- * queues.
- *  
+ * depending on the scheduler it is configured with. While some
+ * schedulers work only with one queue, some schedulers support multiple
+ * queues. Some schedulers also support the notion of queues within
+ * queues - a feature called hierarchical queues.
+ * <p/>
+ * Queue names are unique, and used as a key to lookup queues. Hierarchical
+ * queues are named by a 'fully qualified name' such as q1:q2:q3, where
+ * q2 is a child queue of q1 and q3 is a child queue of q2.
+ * <p/>
+ * Leaf level queues are queues that contain no queues within them. Jobs
+ * can be submitted only to leaf level queues.
+ * <p/>
  * Queues can be configured with various properties. Some of these
  * properties are common to all schedulers, and those are handled by this
- * class. Schedulers might also associate several custom properties with 
- * queues. Where such a case exists, the queue name must be used to link 
- * the common properties with the scheduler specific ones.  
+ * class. Schedulers might also associate several custom properties with
+ * queues. These properties are parsed and maintained per queue by the
+ * framework. If schedulers need more complicated structure to maintain
+ * configuration per queue, they are free to not use the facilities
+ * provided by the framework, but define their own mechanisms. In such cases,
+ * it is likely that the name of the queue will be used to relate the
+ * common properties of a queue with scheduler specific properties.
+ * <p/>
+ * Information related to a queue, such as its name, properties, scheduling
+ * information and children are exposed by this class via a serializable
+ * class called {@link JobQueueInfo}.
+ * <p/>
+ * Queues are configured in the configuration file mapred-queues.xml.
+ * To support backwards compatibility, queues can also be configured
+ * in mapred-site.xml. However, when configured in the latter, there is
+ * no support for hierarchical queues.
  */
+
 class QueueManager {
-  
-  private static final Log LOG = LogFactory.getLog(QueueManager.class);
 
-  // Configured queues this is backed by queues Map , mentioned below
-  private Set<String> queueNames;
+  private static final Log LOG = LogFactory.getLog(QueueManager.class);
 
   // Map of a queue name and Queue object
-  private HashMap<String, Queue> queues;
-
-  // Whether ACLs are enabled in the system or not.
-  private boolean aclsEnabled;
-
-  static final String QUEUE_STATE_SUFFIX = "state";
-
+  private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
+  private Map<String, Queue> allQueues = new HashMap<String, Queue>();
   static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
 
   // Prefix in configuration for queue related keys
   static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
-                          = "mapred.queue.";//Resource in which queue acls are configured.
+    = "mapred.queue.";
+  //Resource in which queue acls are configured.
+
+  private Queue root = null;
+  private boolean isAclEnabled = false;
+
+  /**
+   * Factory method to create an appropriate instance of a queue
+   * configuration parser.
+   * <p/>
+   * Returns a parser that can parse either the deprecated property
+   * style queue configuration in mapred-site.xml, or one that can
+   * parse hierarchical queues in mapred-queues.xml. First preference
+   * is given to configuration in mapred-site.xml. If no queue
+   * configuration is found there, then a parser that can parse
+   * configuration in mapred-queues.xml is created.
+   *
+   * @param conf Configuration instance that determines which parser
+   *             to use.
+   * @return Queue configuration parser
+   */
+  static QueueConfigurationParser getQueueConfigurationParser(
+    Configuration conf, boolean reloadConf) {
+    if (conf != null && conf.get(
+      DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
+      if (reloadConf) {
+        conf.reloadConfiguration();
+      }
+      return new DeprecatedQueueConfigurationParser(conf);
+    } else {
+      URL filePath =
+        ClassLoader.getSystemClassLoader().getResource(QUEUE_CONF_FILE_NAME);
+      return new QueueConfigurationParser(filePath.getPath());
+    }
+  }
+
+  public QueueManager() {
+    initialize(getQueueConfigurationParser(null, false));
+  }
 
   /**
    * Construct a new QueueManager using configuration specified in the passed
    * in {@link org.apache.hadoop.conf.Configuration} object.
-   * 
+   * <p/>
+   * This instance supports queue configuration specified in mapred-site.xml,
+   * but without support for hierarchical queues. If no queue configuration
+   * is found in mapred-site.xml, it will then look for site configuration
+   * in mapred-queues.xml supporting hierarchical queues.
+   *
    * @param conf Configuration object where queue configuration is specified.
    */
   public QueueManager(Configuration conf) {
-    checkDeprecation(conf);
-    conf.addResource(QUEUE_CONF_FILE_NAME);
-
-    queues = new HashMap<String, Queue>();
+    initialize(getQueueConfigurationParser(conf, false));
+  }
 
-    // First get the queue names 
-    String[] queueNameValues = conf.getStrings("mapred.queue.names",
-        new String[]{JobConf.DEFAULT_QUEUE_NAME});
-
-    // Get configured ACLs and state for each queue
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
-    for (String name : queueNameValues) {
-      try {
-        Map<String, AccessControlList> acls = getQueueAcls(name, conf);
-        QueueState state = getQueueState(name, conf);
-        queues.put(name, new Queue(name, acls, state));
-      } catch (Throwable t) {
-        LOG.warn("Not able to initialize queue " + name);
-      }
-    }
-    // Sync queue names with the configured queues.
-    queueNames = queues.keySet();
+  /**
+   * Create an instance that supports hierarchical queues, defined in
+   * the passed in configuration file.
+   * <p/>
+   * This is mainly used for testing purposes and should not called from
+   * production code.
+   *
+   * @param confFile File where the queue configuration is found.
+   */
+  QueueManager(String confFile) {
+    QueueConfigurationParser cp = new QueueConfigurationParser(confFile);
+    initialize(cp);
+  }
+
+  private void initialize(QueueConfigurationParser cp) {
+    this.root = cp.getRoot();
+    leafQueues.clear();
+    allQueues.clear();
+    //At this point we have root populated
+    //update data structures leafNodes.
+    leafQueues = getRoot().getLeafQueues();
+    allQueues.putAll(getRoot().getInnerQueues());
+    allQueues.putAll(leafQueues);
+
+    LOG.info(
+      "Leaf queues and allQueues " + allQueues + " " +
+        "leafQueues " + leafQueues);
+    this.isAclEnabled = cp.isAclsEnabled();
   }
-  
+
+
   /**
-   * Return the set of queues configured in the system.
-   * 
-   * The number of queues configured should be dependent on the Scheduler 
+   * Return the set of leaf level queues configured in the system to
+   * which jobs are submitted.
+   * <p/>
+   * The number of queues configured should be dependent on the Scheduler
    * configured. Note that some schedulers work with only one queue, whereas
    * others can support multiple queues.
-   *  
+   *
    * @return Set of queue names.
    */
-  public synchronized Set<String> getQueues() {
-    return queueNames;
+  public synchronized Set<String> getLeafQueueNames() {
+    return leafQueues.keySet();
   }
-  
+
   /**
-   * Return true if the given {@link Queue.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be
    * performed by the specified user on the given queue.
-   * 
+   * <p/>
    * An operation is allowed if all users are provided access for this
    * operation, or if either the user or any of the groups specified is
    * provided access.
-   * 
-   * @param queueName Queue on which the operation needs to be performed. 
-   * @param oper The operation to perform
-   * @param ugi The user and groups who wish to perform the operation.
-   * 
+   *
+   * @param queueName Queue on which the operation needs to be performed.
+   * @param oper      The operation to perform
+   * @param ugi       The user and groups who wish to perform the operation.
    * @return true if the operation is allowed, false otherwise.
    */
-  public synchronized boolean hasAccess(String queueName, 
-                                Queue.QueueOperation oper,
-                                UserGroupInformation ugi) {
+  public synchronized boolean hasAccess(
+    String queueName,
+    Queue.QueueOperation oper,
+    UserGroupInformation ugi) {
     return hasAccess(queueName, null, oper, ugi);
   }
-  
+
   /**
-   * Return true if the given {@link Queue.QueueOperation} can be 
+   * Return true if the given {@link Queue.QueueOperation} can be
    * performed by the specified user on the specified job in the given queue.
-   * 
-   * An operation is allowed either if the owner of the job is the user 
+   * <p/>
+   * An operation is allowed either if the owner of the job is the user
    * performing the task, all users are provided access for this
    * operation, or if either the user or any of the groups specified is
    * provided access.
-   * 
-   * If the {@link Queue.QueueOperation} is not job specific then the 
+   * <p/>
+   * If the {@link Queue.QueueOperation} is not job specific then the
    * job parameter is ignored.
-   * 
+   *
    * @param queueName Queue on which the operation needs to be performed.
-   * @param job The {@link JobInProgress} on which the operation is being
-   *            performed. 
-   * @param oper The operation to perform
-   * @param ugi The user and groups who wish to perform the operation.
-   * 
+   * @param job       The {@link JobInProgress} on which the operation is being
+   *                  performed.
+   * @param oper      The operation to perform
+   * @param ugi       The user and groups who wish to perform the operation.
    * @return true if the operation is allowed, false otherwise.
    */
-  public synchronized boolean hasAccess(String queueName, JobInProgress job, 
-                                Queue.QueueOperation oper, 
-                                UserGroupInformation ugi) {
-    if (!aclsEnabled) {
-      return true;
-    }
+  public synchronized boolean hasAccess(
+    String queueName, JobInProgress job,
+    Queue.QueueOperation oper,
+    UserGroupInformation ugi) {
     
-    Queue q = queues.get(queueName);
+    Queue q = leafQueues.get(queueName);
+
     if (q == null) {
       LOG.info("Queue " + queueName + " is not present");
       return false;
     }
-    
+
+    if(q.getChildren() != null && !q.getChildren().isEmpty()) {
+      LOG.info("Cannot submit job to parent queue " + q.getName());
+      return false;
+    }
+
+    if (!isAclsEnabled()) {
+      return true;
+    }
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("checking access for : " 
+      LOG.debug(
+        "checking access for : "
           + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
     }
 
     if (oper.isJobOwnerAllowed()) {
-      if (job != null 
-          && job.getJobConf().getUser().equals(ugi.getUserName())) {
+      if (job != null
+        && job.getJobConf().getUser().equals(ugi.getUserName())) {
         return true;
       }
     }
-    
+
     AccessControlList acl = q.getAcls().get(
-                                toFullPropertyName(queueName, 
-                                    oper.getAclName()));
+      toFullPropertyName(
+        queueName,
+        oper.getAclName()));
     if (acl == null) {
       return false;
     }
@@ -210,210 +286,146 @@
 
   /**
    * Checks whether the given queue is running or not.
+   *
    * @param queueName name of the queue
    * @return true, if the queue is running.
    */
   synchronized boolean isRunning(String queueName) {
-    Queue q = queues.get(queueName);
+    Queue q = leafQueues.get(queueName);
     if (q != null) {
       return q.getState().equals(QueueState.RUNNING);
     }
     return false;
   }
-  
+
   /**
    * Set a generic Object that represents scheduling information relevant
    * to a queue.
-   * 
+   * <p/>
    * A string representation of this Object will be used by the framework
    * to display in user facing applications like the JobTracker web UI and
    * the hadoop CLI.
-   * 
-   * @param queueName queue for which the scheduling information is to be set. 
+   *
+   * @param queueName queue for which the scheduling information is to be set.
    * @param queueInfo scheduling information for this queue.
    */
-  public synchronized void setSchedulerInfo(String queueName, 
-                                              Object queueInfo) {
-    if (queues.get(queueName) != null)
-      queues.get(queueName).setSchedulingInfo(queueInfo);
+  public synchronized void setSchedulerInfo(
+    String queueName,
+    Object queueInfo) {
+    if (allQueues.get(queueName) != null) {
+      allQueues.get(queueName).setSchedulingInfo(queueInfo);
+    }
   }
-  
+
   /**
    * Return the scheduler information configured for this queue.
-   * 
+   *
    * @param queueName queue for which the scheduling information is required.
    * @return The scheduling information for this queue.
-   * 
-   * @see #setSchedulingInfo(String, Object)
    */
   public synchronized Object getSchedulerInfo(String queueName) {
-    if (queues.get(queueName) != null)
-      return queues.get(queueName).getSchedulingInfo();
+    if (allQueues.get(queueName) != null) {
+      return allQueues.get(queueName).getSchedulingInfo();
+    }
     return null;
   }
-  
+
   /**
-   * Refresh the acls and state for the configured queues in the system 
-   * by reading it from mapred-queues.xml.
-   * 
-   * Previously configured queues and if or not acls are disabled is retained.
-   * 
+   * Refresh acls, state and scheduler properties for the configured queues.
+   * <p/>
+   * This method reloads configuration related to queues, but does not
+   * support changes to the list of queues or hierarchy. The expected usage
+   * is that an administrator can modify the queue configuration file and
+   * fire an admin command to reload queue configuration. If there is a
+   * problem in reloading configuration, then this method guarantees that
+   * existing queue configuration is untouched and in a consistent state.
+   *
    * @throws IOException when queue configuration file is invalid.
    */
   synchronized void refreshQueues(Configuration conf) throws IOException {
-
-    // First check if things are configured in mapred-site.xml,
-    // so we can print out a deprecation warning. 
-    // This check is needed only until we support the configuration
-    // in mapred-site.xml
-    checkDeprecation(conf);
-    
-    // Add the queue configuration file. Values from mapred-site.xml
-    // will be overridden.
-    conf.addResource(QUEUE_CONF_FILE_NAME);
-    
-    // Now we refresh the properties of the queues. Note that we
-    // do *not* refresh the queue names or the acls flag. Instead
-    // we use the older values configured for them.
-    LOG.info("Refreshing acls and state for configured queues.");
-    try {
-      Iterator<String> itr = queueNames.iterator();
-      while(itr.hasNext()) {
-        String name = itr.next();
-        Queue q = queues.get(name);
-        Map<String, AccessControlList> newAcls = getQueueAcls(name, conf);
-        QueueState newState = getQueueState(name, conf);
-        q.setAcls(newAcls);
-        q.setState(newState);
-      }
-    } catch (Throwable t) {
-      String exceptionString = StringUtils.stringifyException(t);
-      LOG.warn("Queues could not be refreshed because there was an " +
-      		"exception in parsing the configuration: "+ exceptionString +
-      		". Existing ACLs/state is retained.");
-      throw new IOException(exceptionString);
-    }
-  }
-  
-  // Check if queue properties are configured in the passed in
-  // configuration. If yes, print out deprecation warning messages.
-  private void checkDeprecation(Configuration conf) {
-
-    // check if queues are defined.
-    String[] queues = null;
-    String queueNameValues = conf.get("mapred.queue.names");
-    if (queueNameValues != null) {
-      LOG.warn("Configuring \"mapred.queue.names\" in mapred-site.xml or " +
-          		"hadoop-site.xml is deprecated. Configure " +
-              "\"mapred.queue.names\" in " +
-          		QUEUE_CONF_FILE_NAME);
-      // store queues so we can check if ACLs are also configured
-      // in the deprecated files.
-      queues = conf.getStrings("mapred.queue.names");
-    }
-    
-    // check if the acls flag is defined
-    String aclsEnable = conf.get("mapred.acls.enabled");
-    if (aclsEnable != null) {
-      LOG.warn("Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
-          		"hadoop-site.xml is deprecated. Configure " +
-              "\"mapred.acls.enabled\" in " +
-          		QUEUE_CONF_FILE_NAME);
-    }
-    
-    // check if acls are defined
-    if (queues != null) {
-      for (String queue : queues) {
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = toFullPropertyName(queue, oper.getAclName());
-          String aclString = conf.get(key);
-          if (aclString != null) {
-            LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
-          	  "hadoop-site.xml is deprecated. Configure queue ACLs in " + 
-          		QUEUE_CONF_FILE_NAME);
-            // even if one string is configured, it is enough for printing
-            // the warning. so we can return from here.
-            return;
-          }
-        }
-      }
-    }
-  }
-  
-  // Parse ACLs for the queue from the configuration.
-  private Map<String, AccessControlList> getQueueAcls(String name,
-                                                        Configuration conf) {
-    HashMap<String, AccessControlList> map = 
-        new HashMap<String, AccessControlList>();
-    for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-      String aclKey = toFullPropertyName(name, oper.getAclName());
-      map.put(aclKey, new AccessControlList(conf.get(aclKey, "*")));
-    }
-    return map;
-  }
-
-  // Parse ACLs for the queue from the configuration.
-  private QueueState getQueueState(String name, Configuration conf) {
-    QueueState retState = QueueState.RUNNING;
-    String stateVal = conf.get(toFullPropertyName(name,
-                                                  QueueManager.QUEUE_STATE_SUFFIX),
-                               QueueState.RUNNING.getStateName());
-    for (QueueState state : QueueState.values()) {
-      if (state.getStateName().equalsIgnoreCase(stateVal)) {
-        retState = state;
-        break;
-      }
-    }
-    return retState;
+    QueueConfigurationParser cp
+      = QueueManager.getQueueConfigurationParser(conf, true);
+    if (!root.isHierarchySameAs(cp.getRoot())) {
+      throw new IOException(
+        "unable to refresh queues , queue hierarchy changed " +
+          "retaining existing configuration ");
+    }
+    initialize(cp);
   }
- 
-  public static final String toFullPropertyName(String queue,
-      String property) {
+
+  static final String toFullPropertyName(
+    String queue,
+    String property) {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
 
+  /**
+   * Return an array of {@link JobQueueInfo} objects for all the
+   * queues configurated in the system.
+   *
+   * @return array of JobQueueInfo objects.
+   */
   synchronized JobQueueInfo[] getJobQueueInfos() {
     ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
-    for (String queue : queueNames) {
+    for (String queue : allQueues.keySet()) {
       JobQueueInfo queueInfo = getJobQueueInfo(queue);
       if (queueInfo != null) {
-        queueInfoList.add(getJobQueueInfo(queue));  
+        queueInfoList.add(queueInfo);
       }
     }
-    return  queueInfoList.toArray(
-            new JobQueueInfo[queueInfoList.size()]);
+    return queueInfoList.toArray(
+      new JobQueueInfo[queueInfoList.size()]);
   }
 
+
+  /**
+   * Return {@link JobQueueInfo} for a given queue.
+   *
+   * @param queue name of the queue
+   * @return JobQueueInfo for the queue, null if the queue is not found.
+   */
   synchronized JobQueueInfo getJobQueueInfo(String queue) {
-    if (queues.get(queue) != null) {
-      Object schedulingInfo = queues.get(queue).getSchedulingInfo();
-      JobQueueInfo qInfo;
-      if (schedulingInfo != null) {
-        qInfo = new JobQueueInfo(queue, schedulingInfo.toString());
-      } else {
-        qInfo = new JobQueueInfo(queue, null);
-      }
-      qInfo.setQueueState(queues.get(queue).getState().getStateName());
-      return qInfo;
+    if (allQueues.containsKey(queue)) {
+      return allQueues.get(queue).getJobQueueInfo();
     }
+
     return null;
   }
 
   /**
-   * Generates the array of QueueAclsInfo object. 
-   * 
+   * JobQueueInfo for all the queues.
+   * <p/>
+   * Contribs can use this data structure to either create a hierarchy or for
+   * traversing.
+   * They can also use this to refresh properties in case of refreshQueues
+   *
+   * @return a map for easy navigation.
+   */
+  synchronized Map<String, JobQueueInfo> getJobQueueInfoMapping() {
+    Map<String, JobQueueInfo> m = new HashMap<String, JobQueueInfo>();
+
+    for (String key : allQueues.keySet()) {
+      m.put(key, allQueues.get(key).getJobQueueInfo());
+    }
+
+    return m;
+  }
+
+  /**
+   * Generates the array of QueueAclsInfo object.
+   * <p/>
    * The array consists of only those queues for which user has acls.
    *
    * @return QueueAclsInfo[]
    * @throws java.io.IOException
    */
   synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
-                                            throws IOException {
+    throws IOException {
     //List of all QueueAclsInfo objects , this list is returned
     ArrayList<QueueAclsInfo> queueAclsInfolist =
-            new ArrayList<QueueAclsInfo>();
+      new ArrayList<QueueAclsInfo>();
     Queue.QueueOperation[] operations = Queue.QueueOperation.values();
-    for (String queueName : queueNames) {
+    for (String queueName : leafQueues.keySet()) {
       QueueAclsInfo queueAclsInfo = null;
       ArrayList<String> operationsAllowed = null;
       for (Queue.QueueOperation operation : operations) {
@@ -427,33 +439,100 @@
       if (operationsAllowed != null) {
         //There is atleast 1 operation supported for queue <queueName>
         //, hence initialize queueAclsInfo
-        queueAclsInfo = new QueueAclsInfo(queueName, operationsAllowed.toArray
-                (new String[operationsAllowed.size()]));
+        queueAclsInfo = new QueueAclsInfo(
+          queueName, operationsAllowed.toArray
+            (new String[operationsAllowed.size()]));
         queueAclsInfolist.add(queueAclsInfo);
       }
     }
-    return queueAclsInfolist.toArray(new QueueAclsInfo[
-            queueAclsInfolist.size()]);
+    return queueAclsInfolist.toArray(
+      new QueueAclsInfo[
+        queueAclsInfolist.size()]);
   }
 
-  // ONLY FOR TESTING - Do not use in production code.
+  /**
+   * ONLY FOR TESTING - Do not use in production code.
+   * This method is used for setting up of leafQueues only.
+   * We are not setting the hierarchy here.
+   *
+   * @param queues
+   */
   synchronized void setQueues(Queue[] queues) {
+    root.getChildren().clear();
+    leafQueues.clear();
+    allQueues.clear();
+
     for (Queue queue : queues) {
-      this.queues.put(queue.getName(), queue);
+      root.addChild(queue);
+    }
+    //At this point we have root populated
+    //update data structures leafNodes.
+    leafQueues = getRoot().getLeafQueues();
+    allQueues.putAll(getRoot().getInnerQueues());
+    allQueues.putAll(leafQueues);
+  }
+
+  /**
+   * Return an array of {@link JobQueueInfo} objects for the root
+   * queues configured in the system.
+   * <p/>
+   * Root queues are queues that are at the top-most level in the
+   * hierarchy of queues in mapred-queues.xml, or they are the queues
+   * configured in the mapred.queue.names key in mapred-site.xml.
+   *
+   * @return array of JobQueueInfo objects for root level queues.
+   */
+
+  JobQueueInfo[] getRootQueues() {
+    List<JobQueueInfo> list = getRoot().getJobQueueInfo().getChildren();
+    return list.toArray(new JobQueueInfo[list.size()]);
+  }
+
+  /**
+   * Get the complete hierarchy of children for queue
+   * queueName
+   *
+   * @param queueName
+   * @return
+   */
+  JobQueueInfo[] getChildQueues(String queueName) {
+    List<JobQueueInfo> list =
+      allQueues.get(queueName).getJobQueueInfo().getChildren();
+    if (list != null) {
+      return list.toArray(new JobQueueInfo[list.size()]);
+    } else {
+      return new JobQueueInfo[0];
     }
   }
 
   /**
-   * prints the configuration of QueueManager in Json format.
-   * The method should be modified accordingly whenever
-   * QueueManager(Configuration) constructor is modified.
-   * @param writer {@link}Writer object to which the configuration properties 
-   * are printed in json format
-   * @throws IOException
+   * Used only for testing purposes .
+   * This method is unstable as refreshQueues would leave this
+   * data structure in unstable state.
+   *
+   * @param queueName
+   * @return
+   */
+  Queue getQueue(String queueName) {
+    return this.allQueues.get(queueName);
+  }
+
+
+  /**
+   * Return if ACLs are enabled for the Map/Reduce system
+   *
+   * @return true if ACLs are enabled.
+   */
+  boolean isAclsEnabled() {
+    return isAclEnabled;
+  }
+
+  /**
+   * Used only for test.
+   *
+   * @return
    */
-  static void dumpConfiguration(Writer writer) throws IOException {
-    Configuration conf = new Configuration(false);
-    conf.addResource(QUEUE_CONF_FILE_NAME);
-    Configuration.dumpConfiguration(conf, writer);
+  Queue getRoot() {
+    return root;
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/commit-tests?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/commit-tests (original)
+++ hadoop/mapreduce/trunk/src/test/commit-tests Wed Sep 16 04:49:18 2009
@@ -38,4 +38,6 @@
 **/TestTaskTrackerBlacklisting.java
 **/TestTaskTrackerLocalization
 **/TestTrackerDistributedCacheManager
-
+**/TestQueueManagerForHierarchialQueues
+**/TestContainerQueue
+**/TestCapacityScheduler

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java?rev=815605&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java Wed Sep 16 04:49:18 2009
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import static org.apache.hadoop.mapred.Queue.*;
+import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.dom.DOMSource;
+import java.util.Properties;
+import java.util.Set;
+import java.io.File;
+
+public class QueueManagerTestUtils {
+  final static String CONFIG = new File("./test-mapred-queues.xml")
+    .getAbsolutePath();
+
+  //methods to create hierarchy.
+  public static Document createDocument() throws Exception {
+    Document doc = DocumentBuilderFactory
+      .newInstance().newDocumentBuilder().newDocument();
+    return doc;
+  }
+
+  public static void createSimpleDocument(
+    Document doc) throws Exception {
+    Element queues = createQueuesNode(doc, "true");
+
+    //Create parent level queue q1.
+    Element q1 = createQueue(doc, "q1");
+    Properties props = new Properties();
+    props.setProperty("capacity", "10");
+    props.setProperty("maxCapacity", "35");
+    q1.appendChild(createProperties(doc, props));
+    queues.appendChild(q1);
+
+    //Create another parent level p1
+    Element p1 = createQueue(doc, "p1");
+
+    //append child p11 to p1
+    p1.appendChild(createQueue(doc, "p11"));
+
+    Element p12 = createQueue(doc, "p12");
+
+    p12.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
+    p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
+    p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
+
+    //append p12 to p1.
+    p1.appendChild(p12);
+
+
+    queues.appendChild(p1);
+  }
+
+  public static void refreshSimpleDocument(
+    Document doc) throws Exception {
+    Element queues = createQueuesNode(doc, "true");
+
+    //Create parent level queue q1.
+    Element q1 = createQueue(doc, "q1");
+    Properties props = new Properties();
+    props.setProperty("capacity", "70");
+    props.setProperty("maxCapacity", "35");
+    q1.appendChild(createProperties(doc, props));
+    queues.appendChild(q1);
+
+    //Create another parent level p1
+    Element p1 = createQueue(doc, "p1");
+
+    //append child p11 to p1
+    Element p11 = createQueue(doc, "p11");
+    p11.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
+    p1.appendChild(p11);
+
+    Element p12 = createQueue(doc, "p12");
+
+    p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+    p12.appendChild(createAcls(doc, "acl-submit-job", "u3"));
+    p12.appendChild(createAcls(doc, "acl-administer-jobs", "u4"));
+
+    //append p12 to p1.
+    p1.appendChild(p12);
+
+
+    queues.appendChild(p1);
+  }
+
+  /**
+   * Adding a new child to q1
+   *
+   * @param doc
+   * @throws Exception
+   */
+  public static void addMoreChildToSimpleDocumentStructure(Document doc)
+    throws Exception {
+    Element queues = createQueuesNode(doc, "true");
+
+    //Create parent level queue q1.
+    Element q1 = createQueue(doc, "q1");
+    Properties props = new Properties();
+    props.setProperty("capacity", "70");
+    props.setProperty("maxCapacity", "35");
+    q1.appendChild(createProperties(doc, props));
+    queues.appendChild(q1);
+
+    //Adding q11 to existing simple document
+    q1.appendChild(createQueue(doc, "q11"));
+
+    //Create another parent level p1
+    Element p1 = createQueue(doc, "p1");
+
+    //append child p11 to p1
+    Element p11 = createQueue(doc, "p11");
+    p11.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
+    p1.appendChild(p11);
+
+    Element p12 = createQueue(doc, "p12");
+
+    p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+    p12.appendChild(createAcls(doc, ACL_SUBMIT_JOB_TAG, "u3"));
+    p12.appendChild(createAcls(doc, ACL_ADMINISTER_JOB_TAG, "u4"));
+
+    //append p12 to p1.
+    p1.appendChild(p12);
+
+
+    queues.appendChild(p1);
+  }
+
+  public static Element createQueuesNode(Document doc, String enable) {
+    Element queues = doc.createElement("queues");
+    doc.appendChild(queues);
+    queues.setAttribute("aclsEnabled", enable);
+    return queues;
+  }
+
+  public static void writeToFile(Document doc, String filePath)
+    throws TransformerException {
+    Transformer trans = TransformerFactory.newInstance().newTransformer();
+    trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
+    trans.setOutputProperty(OutputKeys.INDENT, "yes");
+    DOMSource source = new DOMSource(doc);
+    trans.transform(source, new StreamResult(new File(filePath)));
+  }
+
+  public static Element createQueue(Document doc, String name) {
+    Element queue = doc.createElement("queue");
+    Element nameNode = doc.createElement("name");
+    nameNode.setTextContent(name);
+    queue.appendChild(nameNode);
+    return queue;
+  }
+
+  public static Element createAcls(
+    Document doc, String aclName, String listNames) {
+    Element acls = doc.createElement(aclName);
+    acls.setTextContent(listNames);
+    return acls;
+  }
+
+  public static Element createState(Document doc, String state) {
+    Element stateElement = doc.createElement("state");
+    stateElement.setTextContent(state);
+    return stateElement;
+  }
+
+  public static Element createProperties(Document doc, Properties props) {
+    Element propsElement = doc.createElement("properties");
+    if (props != null) {
+      Set<String> propList = props.stringPropertyNames();
+      for (String prop : propList) {
+        Element property = doc.createElement("property");
+        property.setAttribute("key", prop);
+        property.setAttribute("value", (String) props.get(prop));
+        propsElement.appendChild(property);
+      }
+    }
+    return propsElement;
+  }
+
+  public static void checkForConfigFile() {
+    if (new File(CONFIG).exists()) {
+      new File(CONFIG).delete();
+    }
+  }
+}



Mime
View raw message