hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r883621 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/ src/test/mapred/org/apache/hadoop/mapred/
Date Tue, 24 Nov 2009 08:22:39 GMT
Author: sharad
Date: Tue Nov 24 08:22:38 2009
New Revision: 883621

URL: http://svn.apache.org/viewvc?rev=883621&view=rev
Log:
MAPREDUCE-28. Refactor TestQueueManager and fix default ACLs. Contributed by V.V.Chaitanya Krishna and Rahul K Singh.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
Removed:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerForHierarchialQueues.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java
    hadoop/mapreduce/trunk/src/test/commit-tests
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=883621&r1=883620&r2=883621&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Nov 24 08:22:38 2009
@@ -910,3 +910,6 @@
     MAPREDUCE-1007. Fix NPE in CapacityTaskScheduler.getJobs(). 
     (V.V.Chaitanya Krishna via sharad)
 
+    MAPREDUCE-28. Refactor TestQueueManager and fix default ACLs.
+    (V.V.Chaitanya Krishna and Rahul K Singh via sharad)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=883621&r1=883620&r2=883621&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Tue Nov 24 08:22:38 2009
@@ -19,6 +19,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Queue.QueueOperation;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
@@ -232,6 +233,9 @@
     validate(queueNode);
     List<Element> subQueues = new ArrayList<Element>();
 
+    String submitKey = "";
+    String adminKey = "";
+    
     for (int j = 0; j < fields.getLength(); j++) {
       Node fieldNode = fields.item(j);
       if (!(fieldNode instanceof Element)) {
@@ -253,6 +257,10 @@
         //parent.child
         name += nameValue;
         newQueue.setName(name);
+        submitKey = toFullPropertyName(name,
+            Queue.QueueOperation.SUBMIT_JOB.getAclName());
+        adminKey = toFullPropertyName(name,
+            Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
       }
 
       if (QUEUE_TAG.equals(field.getTagName()) && field.hasChildNodes()) {
@@ -260,17 +268,11 @@
       }
       if(isAclsEnabled()) {
         if (ACL_SUBMIT_JOB_TAG.equals(field.getTagName())) {
-          String submitList = field.getTextContent();
-          String aclKey = toFullPropertyName(
-            name, Queue.QueueOperation.SUBMIT_JOB.getAclName());
-          acls.put(aclKey, new AccessControlList(submitList));
+          acls.put(submitKey, new AccessControlList(field.getTextContent()));
         }
 
         if (ACL_ADMINISTER_JOB_TAG.equals(field.getTagName())) {
-          String administerList = field.getTextContent();
-          String aclKey = toFullPropertyName(
-            name, Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
-          acls.put(aclKey, new AccessControlList(administerList));
+          acls.put(adminKey, new AccessControlList(field.getTextContent()));
         }
       }
 
@@ -284,6 +286,15 @@
         newQueue.setState(QueueState.getState(state));
       }
     }
+    
+    if (!acls.containsKey(submitKey)) {
+      acls.put(submitKey, new AccessControlList("*"));
+    }
+    
+    if (!acls.containsKey(adminKey)) {
+      acls.put(adminKey, new AccessControlList("*"));
+    }
+    
     //Set acls
     newQueue.setAcls(acls);
     //At this point we have the queue ready at current height level.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=883621&r1=883620&r2=883621&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java Tue Nov 24 08:22:38 2009
@@ -121,7 +121,8 @@
       return new DeprecatedQueueConfigurationParser(conf);
     } else {
       URL filePath =
-        ClassLoader.getSystemClassLoader().getResource(QUEUE_CONF_FILE_NAME);
+        Thread.currentThread().getContextClassLoader()
+          .getResource(QUEUE_CONF_FILE_NAME);
       return new QueueConfigurationParser(filePath.getPath());
     }
   }

Modified: hadoop/mapreduce/trunk/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/commit-tests?rev=883621&r1=883620&r2=883621&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/commit-tests (original)
+++ hadoop/mapreduce/trunk/src/test/commit-tests Tue Nov 24 08:22:38 2009
@@ -38,7 +38,7 @@
 **/TestTaskTrackerBlacklisting.java
 **/TestTaskTrackerLocalization
 **/TestTrackerDistributedCacheManager
-**/TestQueueManagerForHierarchialQueues
+**/TestQueueManager
 **/TestContainerQueue
 **/TestCapacityScheduler
 **/TestRefreshOfQueues

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java?rev=883621&r1=883620&r2=883621&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java Tue Nov 24 08:22:38 2009
@@ -19,11 +19,25 @@
 package org.apache.hadoop.mapred;
 
 //import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import static org.apache.hadoop.mapred.Queue.*;
 import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
 
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.transform.TransformerException;
@@ -35,11 +49,15 @@
 import java.util.Properties;
 import java.util.Set;
 import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
 
 //@Private
 public class QueueManagerTestUtils {
   final static String CONFIG = new File("test-mapred-queues.xml")
-    .getAbsolutePath();
+      .getAbsolutePath();
+  private static final Log LOG = LogFactory.getLog(QueueManagerTestUtils.class);
 
   /**
    * Create and return a new instance of a DOM Document object to build a queue
@@ -49,16 +67,15 @@
    * @throws Exception
    */
   public static Document createDocument() throws Exception {
-    Document doc = DocumentBuilderFactory
-      .newInstance().newDocumentBuilder().newDocument();
+    Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
+        .newDocument();
     return doc;
   }
 
-  public static void createSimpleDocument(
-    Document doc) throws Exception {
+  public static void createSimpleDocument(Document doc) throws Exception {
     Element queues = createQueuesNode(doc, "true");
 
-    //Create parent level queue q1.
+    // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
     Properties props = new Properties();
     props.setProperty("capacity", "10");
@@ -66,30 +83,74 @@
     q1.appendChild(createProperties(doc, props));
     queues.appendChild(q1);
 
-    //Create another parent level p1
+    // Create another parent level p1
     Element p1 = createQueue(doc, "p1");
 
-    //append child p11 to p1
+    // append child p11 to p1
     p1.appendChild(createQueue(doc, "p11"));
 
     Element p12 = createQueue(doc, "p12");
 
     p12.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
-    p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
-    p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
+    p12.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
+    p12.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
 
-    //append p12 to p1.
+    // append p12 to p1.
     p1.appendChild(p12);
 
+    queues.appendChild(p1);
+  }
+
+  static void createSimpleDocumentWithAcls(Document doc, String aclsEnabled) {
+    Element queues = createQueuesNode(doc, aclsEnabled);
+
+    // Create parent level queue q1.
+    Element q1 = createQueue(doc, "q1");
+    Properties props = new Properties();
+    props.setProperty("capacity", "10");
+    props.setProperty("maxCapacity", "35");
+    q1.appendChild(createProperties(doc, props));
+    queues.appendChild(q1);
+
+    // Create another parent level p1
+    Element p1 = createQueue(doc, "p1");
+
+    // append child p11 to p1
+    Element p11 = createQueue(doc, "p11");
+    p11.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
+    p11.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
+    p1.appendChild(p11);
+
+    // append child p12 to p1
+    Element p12 = createQueue(doc, "p12");
+    p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+    p12.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "*"));
+    p12.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "*"));
+    p1.appendChild(p12);
+
+    // append child p13 to p1
+    Element p13 = createQueue(doc, "p13");
+    p13.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+    p1.appendChild(p13);
+
+    // append child p14 to p1
+    Element p14 = createQueue(doc, "p14");
+    p14.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
+    p1.appendChild(p14);
 
     queues.appendChild(p1);
   }
 
-  public static void refreshSimpleDocument(
-    Document doc) throws Exception {
+  public static void refreshSimpleDocument(Document doc) throws Exception {
     Element queues = createQueuesNode(doc, "true");
 
-    //Create parent level queue q1.
+    // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
     Properties props = new Properties();
     props.setProperty("capacity", "70");
@@ -97,10 +158,10 @@
     q1.appendChild(createProperties(doc, props));
     queues.appendChild(q1);
 
-    //Create another parent level p1
+    // Create another parent level p1
     Element p1 = createQueue(doc, "p1");
 
-    //append child p11 to p1
+    // append child p11 to p1
     Element p11 = createQueue(doc, "p11");
     p11.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
     p1.appendChild(p11);
@@ -111,10 +172,9 @@
     p12.appendChild(createAcls(doc, "acl-submit-job", "u3"));
     p12.appendChild(createAcls(doc, "acl-administer-jobs", "u4"));
 
-    //append p12 to p1.
+    // append p12 to p1.
     p1.appendChild(p12);
 
-
     queues.appendChild(p1);
   }
 
@@ -134,7 +194,7 @@
   }
 
   public static void writeToFile(Document doc, String filePath)
-    throws TransformerException {
+      throws TransformerException {
     Transformer trans = TransformerFactory.newInstance().newTransformer();
     trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
     trans.setOutputProperty(OutputKeys.INDENT, "yes");
@@ -150,8 +210,8 @@
     return queue;
   }
 
-  public static Element createAcls(
-    Document doc, String aclName, String listNames) {
+  public static Element createAcls(Document doc, String aclName,
+      String listNames) {
     Element acls = doc.createElement(aclName);
     acls.setTextContent(listNames);
     return acls;
@@ -192,8 +252,7 @@
    * @throws Exception
    */
   public static void writeQueueConfigurationFile(String filePath,
-      JobQueueInfo[] rootQueues)
-      throws Exception {
+      JobQueueInfo[] rootQueues) throws Exception {
     Document doc = createDocument();
     Element queueElements = createQueuesNode(doc, String.valueOf(true));
     for (JobQueueInfo rootQ : rootQueues) {
@@ -202,4 +261,67 @@
     }
     writeToFile(doc, filePath);
   }
+
+  static class QueueManagerConfigurationClassLoader extends ClassLoader {
+    @Override
+    public URL getResource(String name) {
+      if (!name.equals(QueueManager.QUEUE_CONF_FILE_NAME)) {
+        return super.getResource(name);
+      } else {
+        File resourceFile = new File(CONFIG);
+        if (!resourceFile.exists()) {
+          throw new IllegalStateException(
+              "Queue Manager configuration file not found");
+        }
+        try {
+          return resourceFile.toURL();
+        } catch (MalformedURLException e) {
+          LOG.fatal("Unable to form URL for the resource file : ");
+        }
+        return super.getResource(name);
+      }
+    }
+  }
+
+  static Job submitSleepJob(int numMappers, int numReducers, long mapSleepTime,
+      long reduceSleepTime, boolean shouldComplete, String userInfo,
+      String queueName, Configuration clientConf) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    clientConf.set(JTConfig.JT_IPC_ADDRESS, "localhost:"
+        + miniMRCluster.getJobTrackerPort());
+    if (userInfo != null) {
+      clientConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+    }
+    if (queueName != null) {
+      clientConf.set(JobContext.QUEUE_NAME, queueName);
+    }
+    SleepJob sleep = new SleepJob();
+    sleep.setConf(clientConf);
+    Job job = sleep.createJob(numMappers, numReducers, mapSleepTime,
+        (int) mapSleepTime, reduceSleepTime, (int) reduceSleepTime);
+    if (shouldComplete) {
+      job.waitForCompletion(false);
+    } else {
+      job.submit();
+      // miniMRCluster.getJobTrackerRunner().getJobTracker().jobsToComplete()[]
+      Cluster cluster = new Cluster(miniMRCluster.createJobConf());
+      JobStatus[] status = miniMRCluster.getJobTrackerRunner().getJobTracker()
+          .jobsToComplete();
+      JobID id = status[status.length -1].getJobID();
+      Job newJob = cluster.getJob(id);
+      cluster.close();
+      return newJob;
+    }
+    return job;
+  }
+
+  static MiniMRCluster miniMRCluster;
+
+  static void setUpCluster(Configuration conf) throws IOException {
+    JobConf jobConf = new JobConf(conf);
+    String namenode = "file:///";
+    Thread.currentThread().setContextClassLoader(
+        new QueueManagerTestUtils.QueueManagerConfigurationClassLoader());
+    miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=883621&r1=883620&r2=883621&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Tue Nov 24 08:22:38 2009
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,685 +18,564 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
 import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Collection;
+import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.TreeSet;
+import java.util.Map.Entry;
 
-import javax.security.auth.login.LoginException;
 
-import junit.framework.TestCase;
+public class TestQueueManager {
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.QueueState;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapreduce.SleepJob;
-import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation;
-import static org.apache.hadoop.mapred.DeprecatedQueueConfigurationParser.*;
+  private static final Log LOG = LogFactory.getLog(
+    TestQueueManager.class);
 
-public class TestQueueManager extends TestCase {
+  @After
+  public void tearDown() throws Exception {
+    new File(CONFIG).delete();
+  }
+
+  @Test
+  public void testDefault() throws Exception {
+    QueueManager qm = new QueueManager();
+    Queue root = qm.getRoot();
+    assertEquals(root.getChildren().size(), 1);
+    assertEquals(root.getChildren().iterator().next().getName(), "default");
+    assertFalse(qm.isAclsEnabled());
+    assertNull(root.getChildren().iterator().next().getChildren());
+  }
+
+  @Test
+  public void testXMLParsing() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocument(doc);
+    writeToFile(doc, CONFIG);
+    QueueManager qm = new QueueManager(CONFIG);
+    Set<Queue> rootQueues = qm.getRoot().getChildren();
+    List<String> names = new ArrayList<String>();
+    for (Queue q : rootQueues) {
+      names.add(q.getName());
+    }
+
+    //Size of root.
+    assertEquals(rootQueues.size(), 2);
+
+    //check root level queues
+    assertTrue(names.contains("q1"));
+    assertTrue(names.contains("p1"));
+
+
+    //check for leaf names
+    Set<String> leafNames = qm.getLeafQueueNames();
+    Queue p = qm.getQueue("p1");
+    Set<Queue> children = p.getChildren();
+    assertTrue(children.size() == 2);
+
+    //check leaf level queues
+    assertTrue(
+      leafNames.contains(
+        "p1" + NAME_SEPARATOR + "p11"));
+    assertTrue(
+      leafNames.contains(
+        "p1" + NAME_SEPARATOR + "p12"));
+
+
+    Queue q = qm.getQueue(
+      "p1" + NAME_SEPARATOR + "p12");
+
+    assertTrue(
+      q.getAcls().get(
+        QueueManager.toFullPropertyName(
+          q.getName(), ACL_SUBMIT_JOB_TAG)).getUsers().contains(
+        "u1"));
+
+    assertTrue(
+      q.getAcls().get(
+        QueueManager.toFullPropertyName(
+          q.getName(),
+          ACL_ADMINISTER_JOB_TAG))
+        .getUsers().contains("u2"));
+    assertTrue(q.getState().equals(QueueState.STOPPED));
+  }
+
+  @Test
+  public void testhasAccess() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocumentWithAcls(doc,"true");
+    writeToFile(doc, CONFIG);
+    QueueManager qm = new QueueManager(CONFIG);
+
+    UserGroupInformation ugi;
+    // test for acls access when acls are set with *
+    ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
+        Queue.QueueOperation.SUBMIT_JOB, ugi));
+    ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
+        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+    
+    // test for acls access when acls are not set with *
+    ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
+        Queue.QueueOperation.SUBMIT_JOB, ugi));
+    ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
+        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+    
+    // test for acls access when acls are not specified but acls is enabled
+    ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+        Queue.QueueOperation.SUBMIT_JOB, ugi));
+    ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+    
+    assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
+  }
+  
+  @Test
+  public void testQueueView() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocument(doc);
+    writeToFile(doc, CONFIG);
+    QueueManager qm = new QueueManager(CONFIG);
+    
+    for (Queue queue : qm.getRoot().getChildren()) {
+      checkHierarchy(queue, qm);
+    }
+  }
+
+  private void checkHierarchy(Queue queue, QueueManager queueManager) {
+    JobQueueInfo jobQueueInfo = queueManager.getJobQueueInfo(queue.getName());
+    assertEquals(queue.getName(),jobQueueInfo.getQueueName());
+    assertEquals(queue.getState(),jobQueueInfo.getState());
+    if (queue.getChildren() !=null && queue.getChildren().size() > 0) {
+      for (Queue childQueue : queue.getChildren()) {
+        checkHierarchy(childQueue, queueManager);
+      }
+    }
+  }
+
+  @Test
+  public void testhasAccessForParent() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocument(doc);
+    writeToFile(doc, CONFIG);
+    QueueManager qm = new QueueManager(CONFIG);
+
+    UserGroupInformation ugi =
+      new UnixUserGroupInformation("u1", new String[]{" "});
+    assertFalse(
+      qm.hasAccess(
+        "p1",
+        Queue.QueueOperation.SUBMIT_JOB, ugi));
+  }
+
+  @Test
+  public void testValidation() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    Element queues = createQueuesNode(doc, "false");
+    Element q1 = createQueue(doc, "q1");
+
+    q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
+    q1.appendChild(createAcls(doc, "acl-administer-jobs", "u2"));
+    q1.appendChild(createQueue(doc, "p15"));
+    q1.appendChild(createQueue(doc, "p16"));
 
-  static final Log LOG = LogFactory.getLog(TestQueueManager.class);
-  
-  private MiniDFSCluster miniDFSCluster;
-  private MiniMRCluster miniMRCluster;
-  
-  public void testMultipleQueues() {
-    JobConf conf = new JobConf();
-    conf.set("mapred.queue.names", "q1,q2,Q3");
-    QueueManager qMgr = new QueueManager(conf);
-    Set<String> expQueues = new TreeSet<String>();
-    expQueues.add("q1");
-    expQueues.add("q2");
-    expQueues.add("Q3");
-    verifyQueues(expQueues, qMgr.getLeafQueueNames());
-  }
-
-  public void testSchedulerInfo() {
-    JobConf conf = new JobConf();
-    conf.set("mapred.queue.names", "qq1,qq2");
-    QueueManager qMgr = new QueueManager(conf);
-    qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
-    qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
-    assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
-    assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
-  }
-
-  public void testAllEnabledACLForJobSubmission()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
-    verifyJobSubmission(conf, true);
-  }
-
-  public void testAllDisabledACLForJobSubmission()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
-    verifyJobSubmission(conf, false);
-  }
-
-  public void testUserDisabledACLForJobSubmission()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
-                                "3698-non-existent-user");
-    verifyJobSubmission(conf, false);
-  }
-
-  public void testDisabledACLForNonDefaultQueue()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    // allow everyone in default queue
-    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
-    // setup a different queue
-    conf.set("mapred.queue.names", "default,q1");
-    // setup a different acl for this queue.
-    conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
-    // verify job submission to other queue fails.
-    verifyJobSubmission(conf, false, "q1");
-  }
-
-  public void testSubmissionToInvalidQueue()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = new JobConf();
-    conf.set("mapred.queue.names","default");
-    setUpCluster(conf);
-    String queueName = "q1";
+    queues.appendChild(q1);
+    writeToFile(doc, CONFIG);
     try {
-      Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
-    } catch (IOException ioe) {
-       assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does " +
-         "not exist"));
-       return;
-    } finally {
-      tearDownCluster();
-    }
-    fail("Job submission to invalid queue job shouldnot complete " +
-      ", it should fail with proper exception ");
-  }
-
-  public void testEnabledACLForNonDefaultQueue() throws IOException,
-      LoginException, InterruptedException, ClassNotFoundException {
-    // login as self...
-    UserGroupInformation ugi = UnixUserGroupInformation.login();
-    String userName = ugi.getUserName();
-    // allow everyone in default queue
-    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
-    // setup a different queue
-    conf.set("mapred.queue.names", "default,q2");
-    // setup a different acl for this queue.
-    conf.set("mapred.queue.q2.acl-submit-job", userName);
-    // verify job submission to other queue fails.
-    verifyJobSubmission(conf, true, "q2");
-  }
-
-  public void testUserEnabledACLForJobSubmission()
-      throws IOException, LoginException,
-             InterruptedException, ClassNotFoundException {
-    // login as self...
-    UserGroupInformation ugi = UnixUserGroupInformation.login();
-    String userName = ugi.getUserName();
-    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
-                                  "3698-junk-user," + userName
-                                    + " 3698-junk-group1,3698-junk-group2");
-    verifyJobSubmission(conf, true);
-  }
-
-  public void testGroupsEnabledACLForJobSubmission()
-      throws IOException, LoginException,
-             InterruptedException, ClassNotFoundException {
-    // login as self, get one group, and add in allowed list.
-    UserGroupInformation ugi = UnixUserGroupInformation.login();
-    String[] groups = ugi.getGroupNames();
-    assertTrue(groups.length > 0);
-    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
-                                "3698-junk-user1,3698-junk-user2 "
-                                  + groups[groups.length-1]
-                                           + ",3698-junk-group");
-    verifyJobSubmission(conf, true);
-  }
-
-  public void testAllEnabledACLForJobKill()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
-    verifyJobKill(conf, true);
-  }
-
-  public void testAllDisabledACLForJobKill()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
-    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
-  }
-
-  public void testOwnerAllowedForJobKill()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
-                                              "junk-user");
-    verifyJobKill(conf, true);
-  }
-
-  public void testUserDisabledACLForJobKill()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    //setup a cluster allowing a user to submit
-    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
-                                              "dummy-user");
-    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
-  }
-
-  public void testUserEnabledACLForJobKill() throws IOException,
-      LoginException, InterruptedException, ClassNotFoundException {
-    // login as self...
-    UserGroupInformation ugi = UnixUserGroupInformation.login();
-    String userName = ugi.getUserName();
-    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
-                                              "dummy-user,"+userName);
-    verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
-  }
-
-  public void testUserDisabledForJobPriorityChange()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
-                              "junk-user");
-    verifyJobPriorityChangeAsOtherUser(conf, false,
-                              "junk-user,junk-user-group");
+      new QueueManager(CONFIG);
+      fail("Should throw an exception as configuration is wrong ");
+    } catch (RuntimeException re) {
+      LOG.info(re.getMessage());
+    }
   }
-  
-  /**
-   * Test to verify refreshing of queue properties by using MRAdmin tool.
-   *
-   * @throws Exception
-   */
 
-  public void testACLRefresh() throws Exception {
-    String queueConfigPath =
-        System.getProperty("test.build.extraconf", "build/test/extraconf");
-    File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
+  @Test
+  public void testInvalidName() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    Element queues = createQueuesNode(doc, "false");
+    Element q1 = createQueue(doc, "");
+    queues.appendChild(q1);
+    writeToFile(doc, CONFIG);
     try {
-      //Setting up default mapred-site.xml
-      Properties hadoopConfProps = new Properties();
-      //these properties should be retained.
-      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
-      hadoopConfProps.put("mapred.acls.enabled", "true");
-      //These property should always be overridden
-      hadoopConfProps.put("mapred.queue.default.acl-submit-job", "u1");
-      hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u2");
-      hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "u1");
-      //Actual property which would be used.
-      hadoopConfProps.put("mapred.queue.default.acl-submit-job", " ");
-      //Writing out the queue configuration file.
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
-      //Create a new configuration to be used with QueueManager
-      JobConf conf = new JobConf();
-      QueueManager queueManager = new QueueManager(conf);
-      UserGroupInformation ugi =
-        new UnixUserGroupInformation("unknownUser",new String[]{" "});
-      //Job Submission should fail because ugi to be used is set to blank.
-      assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("default", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q1", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q2", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-
-      //Test job submission as alternate user.
-      Configuration alternateUserConfig = new Configuration();
-      alternateUserConfig.set("hadoop.job.ugi","u1,users");
-      UserGroupInformation alternateUgi =
-        UserGroupInformation.readFrom(alternateUserConfig);
-      assertTrue("Alternate User Job Submission failed before refresh.",
-          queueManager.hasAccess("q2", Queue.QueueOperation.
-              SUBMIT_JOB, alternateUgi));
-
-      //Set acl for the current user.
-      hadoopConfProps.put(MAPRED_QUEUE_NAMES_KEY, "default,q1,q2");
-      hadoopConfProps.put("mapred.acls.enabled", "true");
-      hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
-      hadoopConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
-      hadoopConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-      //refresh configuration
-      queueManager.refreshQueues(conf, null);
-      //Submission should succeed
-      assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("default", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q1", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q2", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertFalse("Alternate User Job Submission succeeded after refresh.",
-          queueManager.hasAccess("q2", Queue.QueueOperation.
-              SUBMIT_JOB, alternateUgi));
-      //rewrite the mapred-site.xml
-      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");        
-      hadoopConfProps.put("mapred.acls.enabled", "true");
-      hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-      queueManager.refreshQueues(conf, null);
-      assertTrue("User Job Submission failed after refresh and no queue acls file.",
-          queueManager.hasAccess("default", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-    } finally{
-      if(hadoopConfigFile.exists()) {
-        hadoopConfigFile.delete();
-      }
+      new QueueManager(CONFIG);
+      fail("Should throw an exception as configuration is wrong ");
+    } catch (Exception re) {
+      re.printStackTrace();
+      LOG.info(re.getMessage());
+    }
+    checkForConfigFile();
+    doc = createDocument();
+    queues = createQueuesNode(doc, "false");
+    q1 = doc.createElement("queue");
+    queues.appendChild(q1);
+    writeToFile(doc, CONFIG);
+    try {
+      new QueueManager(CONFIG);
+      fail("Should throw an exception as configuration is wrong ");
+    } catch (RuntimeException re) {
+      re.printStackTrace();
+      LOG.info(re.getMessage());
     }
   }
 
-  /**
-   * Test to verify refreshing of queue properties by using MRAdmin tool.
-   *
-   * @throws Exception
-   */
-  public void testStateRefresh() throws Exception {
-    String queueConfigPath =
-        System.getProperty("test.build.extraconf", "build/test/extraconf");
-    File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
-
+  @Test
+  public void testEmptyProperties() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    Element queues = createQueuesNode(doc, "false");
+    Element q1 = createQueue(doc, "q1");
+    Element p = createProperties(doc, null);
+    q1.appendChild(p);
+    queues.appendChild(q1);
+  }
+
+  @Test
+  public void testEmptyFile() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    writeToFile(doc, CONFIG);
     try {
-      //Setting up default mapred-site.xml
-      Properties queueConfProps = new Properties();
-      //these properties should be retained.
-      queueConfProps.put("mapred.queue.names", "default,qu1");
-      queueConfProps.put("mapred.acls.enabled", "true");
-      //These property should always be overridden
-      queueConfProps.put("mapred.queue.default.state", "running");
-      queueConfProps.put("mapred.queue.qu1.state", "stopped");
-      UtilsForTests.setUpConfigFile(queueConfProps, hadoopConfigFile);
-
-      //Create a new configuration to be used with QueueManager
-      JobConf conf = new JobConf();
-      setUpCluster(conf);
-      QueueManager queueManager =
-        this.miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager();
-
-      try{
-        Job job = submitSleepJob(10, 2, 10, 10, true,null, "default" );
-        assert(job.isSuccessful());
-      }catch(Exception e){
-        fail("submit job in default queue should be sucessful ");
-      }
-
-      try{
-        submitSleepJob(10, 2, 10, 10, true,null, "qu1" );
-        fail("submit job in default queue should be failed ");
-      }catch(Exception e){
-        assert(e.getMessage().contains("Queue \"" + "qu1" + "\" is not running"));
-      }
-
-      // verify state of queues before refresh
-      JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default");
-      assertEquals(QueueState.RUNNING.getStateName(),
-                    queueInfo.getQueueState());
-      queueInfo = queueManager.getJobQueueInfo("qu1");
-      assertEquals(QueueState.STOPPED.getStateName(),
-                    queueInfo.getQueueState());
-      queueConfProps.put("mapred.queue.names", "default,qu1");
-      queueConfProps.put("mapred.acls.enabled", "true");
-      queueConfProps.put("mapred.queue.default.state", "stopped");
-      queueConfProps.put("mapred.queue.qu1.state", "running");
-      UtilsForTests.setUpConfigFile(queueConfProps, hadoopConfigFile);
-
-      //refresh configuration
-      queueManager.refreshQueues(conf, null);
-
-      //Job Submission should pass now because ugi to be used is set to blank.
-      try{
-        submitSleepJob(10, 2, 10, 10, true,null,"qu1");
-      }catch(Exception e){
-        fail("submit job in qu1 queue should be sucessful ");
-      }
-
-      try{
-        submitSleepJob(10, 2, 10, 10, true,null, "default" );
-        fail("submit job in default queue should be failed ");
-      }catch(Exception e){
-        assert(e.getMessage().contains("Queue \"" + "default" + "\" is not running"));
-      }
+      new QueueManager(CONFIG);
+      fail("Should throw an exception as configuration is wrong ");
+    } catch (Exception re) {
+      re.printStackTrace();
+      LOG.info(re.getMessage());
+    }
+  }
 
-      // verify state of queues after refresh
-      queueInfo = queueManager.getJobQueueInfo("default");
-      assertEquals(QueueState.STOPPED.getStateName(),
-                    queueInfo.getQueueState());
-      queueInfo = queueManager.getJobQueueInfo("qu1");
-      assertEquals(QueueState.RUNNING.getStateName(),
-                    queueInfo.getQueueState());
-    } finally{
-      if(hadoopConfigFile.exists()) {
-        hadoopConfigFile.delete();
+  @Test
+  public void testJobQueueInfoGeneration() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocument(doc);
+    writeToFile(doc, CONFIG);
+    QueueManager qm = new QueueManager(CONFIG);
+
+    List<JobQueueInfo> rootQueues =
+      qm.getRoot().getJobQueueInfo().getChildren();
+    assertEquals(rootQueues.size(), 2);
+    List<String> names = new ArrayList<String>();
+    for (JobQueueInfo q : rootQueues) {
+      names.add(q.getQueueName());
+      if (q.getQueueName().equals("q1")) {
+        Properties p = q.getProperties();
+        assertEquals(p.getProperty("capacity"), "10");
+        assertEquals(p.getProperty("maxCapacity"), "35");
+
+        assertTrue(q.getChildren().isEmpty());
+      } else if (q.getQueueName().equals("p1")) {
+        List<JobQueueInfo> children = q.getChildren();
+        assertEquals(children.size(), 2);
+        for (JobQueueInfo child : children) {
+          if (child.getQueueName().equals(
+            "p1" + NAME_SEPARATOR + "p12")) {
+            assertEquals(
+              child.getQueueState(), QueueState.STOPPED.getStateName());
+          } else if (child.getQueueName().equals(
+            "p1" + NAME_SEPARATOR + "p11")) {
+            assertEquals(
+              child.getQueueState(), QueueState.RUNNING.getStateName());
+          } else {
+            fail("Only 2 children");
+          }
+        }
+      } else {
+        fail("Only 2 queues with q1 and p1 ");
       }
-      this.tearDownCluster();
     }
   }
 
-  public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
-    String queueConfigPath =
-      System.getProperty("test.build.extraconf", "build/test/extraconf");
-
-    File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
-    try {
-      // queue properties with which the cluster is started.
-      Properties hadoopConfProps = new Properties();
-      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
-      hadoopConfProps.put("mapred.acls.enabled", "true");
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
-      //properties for mapred-queue-acls.xml
-      UserGroupInformation ugi =
-        new UnixUserGroupInformation("unknownUser",new String[]{" "});
-      hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
-      hadoopConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
-      hadoopConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
-      Configuration conf = new JobConf();
-      QueueManager queueManager = new QueueManager(conf);
-      //Testing access to queue.
-      assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("default", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q1", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q2", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-
-      //Write out a new incomplete invalid configuration file.
-      PrintWriter writer = new PrintWriter(new FileOutputStream(hadoopConfigFile));
-      writer.println("<configuration>");
-      writer.println("<property>");
-      writer.flush();
-      writer.close();
-      try {
-        //Exception to be thrown by queue manager because configuration passed
-        //is invalid.
-        queueManager.refreshQueues(conf, null);
-        fail("Refresh of ACLs should have failed with invalid conf file.");
-      } catch (Exception e) {
+  /**
+   * Test the refresh of queues.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRefresh() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocument(doc);
+    writeToFile(doc, CONFIG);
+    QueueManager qm = new QueueManager(CONFIG);
+    Queue beforeRefreshRoot = qm.getRoot();
+    //remove the file and create new one.
+    Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
+    for (Queue qs : rootQueues) {
+      if (qs.getName().equals("q1")) {
+
+        assertEquals(qs.getProperties().getProperty("capacity"), "10");
+        assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
+
+      } else if (qs.getName().equals("p1")) {
+
+        Set<Queue> children = qs.getChildren();
+        for (Queue child : children) {
+          if (child.getName().equals(
+            "p1" + NAME_SEPARATOR + "p12")) {
+            assertTrue(
+              child.getAcls().get(
+                QueueManager.toFullPropertyName(
+                  child.getName(), ACL_SUBMIT_JOB_TAG))
+                .getUsers().contains("u1"));
+
+            assertTrue(
+              child.getAcls().get(
+                QueueManager.toFullPropertyName(
+                  child.getName(),
+                  ACL_ADMINISTER_JOB_TAG))
+                .getUsers().contains("u2"));
+            assertTrue(child.getState().equals(QueueState.STOPPED));
+          } else {
+            assertTrue(child.getState().equals(QueueState.RUNNING));
+          }
+        }
       }
-      assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("default", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q1", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q2", Queue.QueueOperation.
-              SUBMIT_JOB, ugi));
-    } finally {
-      //Cleanup the configuration files in all cases
-      if(hadoopConfigFile.exists()) {
-        hadoopConfigFile.delete();
+    }
+    checkForConfigFile();
+    doc = createDocument();
+    refreshSimpleDocument(doc);
+    writeToFile(doc, CONFIG);
+    QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+    qm.getRoot().isHierarchySameAs(cp.getRoot());
+    qm.setQueues(
+      cp.getRoot().getChildren().toArray(
+        new Queue[cp.getRoot().getChildren().size()]));
+    Queue afterRefreshRoot = qm.getRoot();
+    //remove the file and create new one.
+    rootQueues = afterRefreshRoot.getChildren();
+    for (Queue qs : rootQueues) {
+      if (qs.getName().equals("q1")) {
+
+        assertEquals(qs.getProperties().getProperty("capacity"), "70");
+        assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
+
+      } else if (qs.getName().equals("p1")) {
+
+        Set<Queue> children = qs.getChildren();
+        for (Queue child : children) {
+          if (child.getName().equals(
+            "p1" + NAME_SEPARATOR + "p12")) {
+            assertTrue(
+              child.getAcls().get(
+                QueueManager.toFullPropertyName(
+                  child.getName(),
+                  ACL_SUBMIT_JOB_TAG))
+                .getUsers().contains("u3"));
+
+            assertTrue(
+              child.getAcls().get(
+                QueueManager.toFullPropertyName(
+                  child.getName(),
+                  ACL_ADMINISTER_JOB_TAG))
+                .getUsers().contains("u4"));
+            assertTrue(child.getState().equals(QueueState.RUNNING));
+          } else {
+            assertTrue(child.getState().equals(QueueState.STOPPED));
+          }
+        }
       }
     }
   }
 
-  private JobConf setupConf(String aclName, String aclValue) {
-    JobConf conf = new JobConf();
-    if(conf.get("mapred.queue.names") == null) {
-      conf.set("mapred.queue.names","default");
-    }
-    conf.setBoolean("mapred.acls.enabled", true);
-    conf.set(aclName, aclValue);
-    return conf;
-  }
-  
-  private void verifyQueues(Set<String> expectedQueues, 
-                                          Set<String> actualQueues) {
-    assertEquals(expectedQueues.size(), actualQueues.size());
-    for (String queue : expectedQueues) {
-      assertTrue(actualQueues.contains(queue));
+  @Test
+  public void testRefreshWithInvalidFile() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocument(doc);
+    writeToFile(doc, CONFIG);
+    QueueManager qm = new QueueManager(CONFIG);
+
+    checkForConfigFile();
+    doc = createDocument();
+    Element queues = createQueuesNode(doc, "false");
+    Element q1 = createQueue(doc, "");
+    queues.appendChild(q1);
+    writeToFile(doc, CONFIG);
+    try {
+      QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+
+      fail("Should throw an exception as configuration is wrong ");
+    } catch (Throwable re) {
+      re.printStackTrace();
+      LOG.info(re.getMessage());
     }
   }
-  
-  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed) 
-      throws IOException, InterruptedException, ClassNotFoundException {
-    verifyJobSubmission(conf, shouldSucceed, "default");
-  }
 
-  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
-                                   String queue) 
-      throws IOException, InterruptedException, ClassNotFoundException {
-    if(conf.get("mapred.queue.names") == null) {
-      conf.set("mapred.queue.names","default");
-    }
-    setUpCluster(conf);
-    try {
-      runAndVerifySubmission(conf, shouldSucceed, queue, null);
-    } finally {
-      tearDownCluster();
+  /**
+   * Class to store the array of queues retrieved by parsing the string 
+   * that is dumped in Json format
+   */
+  static class JsonQueueTree {
+    boolean acls_enabled;
+    
+    JsonQueue[] queues;
+
+    public JsonQueue[] getQueues() {
+      return queues;
     }
-  }
 
-  private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
-      String queue, String userInfo)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    try {
-      Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
-      if (shouldSucceed) {
-        assertTrue(rjob.isSuccessful());
-      } else {
-        fail("Job submission should have failed.");
-      }
-    } catch (IOException ioe) {
-      if (shouldSucceed) {
-        throw ioe;
-      } else {
-        LOG.info("exception while submitting job: " + ioe.getMessage());
-        assertTrue(ioe.getMessage().
-            contains("cannot perform operation " +
-            "SUBMIT_JOB on queue " + queue));
-        // check if the system directory gets cleaned up or not
-        JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
-        Path sysDir = new Path(jobtracker.getSystemDir());
-        FileSystem fs = sysDir.getFileSystem(conf);
-        int size = fs.listStatus(sysDir).length;
-        while (size > 1) { // ignore the jobtracker.info file
-          System.out.println("Waiting for the job files in sys directory to be cleaned up");
-          UtilsForTests.waitFor(100);
-          size = fs.listStatus(sysDir).length;
-        }
-      }
-    } finally {
-      tearDownCluster();
+    public void setQueues(JsonQueue[] queues) {
+      this.queues = queues;
     }
-}
 
-  private void verifyJobKill(JobConf conf, boolean shouldSucceed) 
-      throws IOException, InterruptedException, ClassNotFoundException {
-    setUpCluster(conf);
-    try {
-      Job rjob = submitSleepJob(1, 1, 1000, 1000, false);
-      assertFalse(rjob.isComplete());
-      while(rjob.mapProgress() == 0.0f) {
-        try {
-          Thread.sleep(10);  
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-      rjob.killJob();
-      while (!rjob.isComplete()) {
-        try {
-          Thread.sleep(10);  
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-      if (shouldSucceed) {
-        assertTrue(!rjob.isSuccessful());
-      } else {
-        fail("Job kill should have failed.");
-      }
-    } catch (IOException ioe) {
-      if (shouldSucceed) {
-        throw ioe;
-      } else {
-        LOG.info("exception while submitting job: " + ioe.getMessage());
-        assertTrue(ioe.getMessage().
-                        contains("cannot perform operation " +
-                                    "ADMINISTER_JOBS on queue default"));
-      }
-    } finally {
-      tearDownCluster();
+    public boolean isAcls_enabled() {
+      return acls_enabled;
     }
-  }
 
-  
-  private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
-                                        String otherUserInfo) 
-  throws IOException, InterruptedException, ClassNotFoundException {
-    setUpCluster(conf);
-    try {
-      // submit a job as another user.
-      String userInfo = otherUserInfo;
-      Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
-      assertFalse(job.isComplete());
-
-      //try to kill as self
-      try {
-        JobClient jc = new JobClient(miniMRCluster.createJobConf());
-        RunningJob rjob = jc.getJob((JobID)job.getID());
-        rjob.killJob();
-        if (!shouldSucceed) {
-          fail("should fail kill operation");  
-        }
-      } catch (IOException ioe) {
-        if (shouldSucceed) {
-          throw ioe;
-        }
-        //verify it fails
-        LOG.info("exception while submitting job: " + ioe.getMessage());
-        assertTrue(ioe.getMessage().
-                        contains("cannot perform operation " +
-                                    "ADMINISTER_JOBS on queue default"));
-      }
-      //wait for job to complete on its own
-      while (!job.isComplete()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-    } finally {
-      tearDownCluster();
+    public void setAcls_enabled(boolean aclsEnabled) {
+      acls_enabled = aclsEnabled;
     }
   }
   
-  private void verifyJobPriorityChangeAsOtherUser(JobConf conf, 
-                          boolean shouldSucceed, String otherUserInfo)
-  throws IOException, InterruptedException, ClassNotFoundException {
-    setUpCluster(conf);
-    try {
-      // submit job as another user.
-      String userInfo = otherUserInfo;
-      Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
-      assertFalse(job.isComplete());
-      
-      // try to change priority as self
-      try {
-        JobClient jc = new JobClient(miniMRCluster.createJobConf());
-        RunningJob rjob = jc.getJob((JobID)job.getID());
-        rjob.setJobPriority("VERY_LOW");
-        if (!shouldSucceed) {
-          fail("changing priority should fail.");
-        }
-      } catch (IOException ioe) {
-        //verify it fails
-        LOG.info("exception while submitting job: " + ioe.getMessage());
-        assertTrue(ioe.getMessage().
-                        contains("cannot perform operation " +
-                                    "ADMINISTER_JOBS on queue default"));
-      }
-      //wait for job to complete on its own
-      while (!job.isComplete()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-    } finally {
-      tearDownCluster();
+  /**
+   * Class to store the contents of each queue that is dumped in JSON format.
+   */
+  static class JsonQueue {
+    String name;
+    String state;
+    String acl_submit_job;
+    String acl_administer_jobs;
+    JsonProperty[] properties;
+    JsonQueue[] children;
+    public String getName() {
+      return name;
+    }
+    public String getState() {
+      return state;
+    }
+    public JsonProperty[] getProperties() {
+      return properties;
+    }
+    public JsonQueue[] getChildren() {
+      return children;
+    }
+    public void setName(String name) {
+      this.name = name;
+    }
+    public void setState(String state) {
+      this.state = state;
+    }
+    public void setProperties(JsonProperty[] properties) {
+      this.properties = properties;
+    }
+    public void setChildren(JsonQueue[] children) {
+      this.children = children;
+    }
+    public String getAcl_submit_job() {
+      return acl_submit_job;
+    }
+    public void setAcl_submit_job(String aclSubmitJob) {
+      acl_submit_job = aclSubmitJob;
+    }
+    public String getAcl_administer_jobs() {
+      return acl_administer_jobs;
+    }
+    public void setAcl_administer_jobs(String aclAdministerJobs) {
+      acl_administer_jobs = aclAdministerJobs;
     }
   }
   
-  private void setUpCluster(JobConf conf) throws IOException {
-    if(conf.get("mapred.queue.names") == null)
-      conf.set("mapred.queue.names","default");
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
-                      null, null, conf);
-  }
-  
-  private void tearDownCluster() throws IOException {
-    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
-    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
-  }
-  
-  private Job submitSleepJob(int numMappers, int numReducers, 
-                             long mapSleepTime, long reduceSleepTime,
-                             boolean shouldComplete) 
-      throws IOException, InterruptedException, ClassNotFoundException {
-    return submitSleepJob(numMappers, numReducers, mapSleepTime,
-                          reduceSleepTime, shouldComplete, null);
-  }
-  
-  private Job submitSleepJob(int numMappers, int numReducers, 
-                             long mapSleepTime, long reduceSleepTime,
-                             boolean shouldComplete, String userInfo) 
-  throws IOException, InterruptedException, ClassNotFoundException {
-    return submitSleepJob(numMappers, numReducers, mapSleepTime, 
-                          reduceSleepTime, shouldComplete, userInfo, null);
-  }
-
-  private Job submitSleepJob(int numMappers, int numReducers, 
-                             long mapSleepTime, long reduceSleepTime,
-                             boolean shouldComplete, String userInfo,
-                             String queueName) 
-  throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration clientConf = new Configuration();
-    clientConf.set(JTConfig.JT_IPC_ADDRESS, "localhost:"
-        + miniMRCluster.getJobTrackerPort());
-    if (userInfo != null) {
-      clientConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
-    }
-    if (queueName != null) {
-      clientConf.set(JobContext.QUEUE_NAME, queueName);
-    }
-    SleepJob sleep = new SleepJob();
-    sleep.setConf(clientConf);
-    Job job = sleep.createJob(numMappers, numReducers, 
-        mapSleepTime, (int)mapSleepTime/100,
-        reduceSleepTime, (int)reduceSleepTime/100);
-    if (shouldComplete) {
-      job.waitForCompletion(false);  
-    } else {
-      job.submit();
+  /**
+   * Class to store the contents of attribute "properties" in Json dump
+   */
+  static class JsonProperty {
+    String key;
+    String value;
+    public String getKey() {
+      return key;
+    }
+    public void setKey(String key) {
+      this.key = key;
+    }
+    public String getValue() {
+      return value;
+    }
+    public void setValue(String value) {
+      this.value = value;
     }
-    return job;
   }
 
+  /**
+   * checks the format of the dump in JSON format when 
+   * QueueManager.dumpConfiguration(Writer) is called.
+   * @throws Exception
+   */
+  @Test
+  public void testDumpConfiguration() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocument(doc);    
+    writeToFile(doc, CONFIG);
+    StringWriter out = new StringWriter();
+    QueueManager.dumpConfiguration(out,CONFIG,null);
+    ObjectMapper mapper = new ObjectMapper();
+    // parse the Json dump
+    JsonQueueTree queueTree =
+      mapper.readValue(out.toString(), JsonQueueTree.class);
+    
+    // check if acls_enabled is correct
+    assertEquals(true, queueTree.isAcls_enabled());
+    // check for the number of top-level queues
+    assertEquals(2, queueTree.getQueues().length);
+    
+    HashMap<String, JsonQueue> topQueues = new HashMap<String, JsonQueue>();
+    for (JsonQueue topQueue : queueTree.getQueues()) {
+      topQueues.put(topQueue.getName(), topQueue);
+    }
+    
+    // check for consistency in number of children
+    assertEquals(2, topQueues.get("p1").getChildren().length);
+    
+    HashMap<String, JsonQueue> childQueues = new HashMap<String, JsonQueue>();
+    for (JsonQueue child : topQueues.get("p1").getChildren()) {
+      childQueues.put(child.getName(), child);
+    }
+    
+    // check for consistency in state
+    assertEquals("stopped", childQueues.get("p1:p12").getState());
+     
+    // check for consistency in properties
+    HashMap<String, JsonProperty> q1_properties =
+      new HashMap<String, JsonProperty>();
+    for (JsonProperty prop : topQueues.get("q1").getProperties()) {
+      q1_properties.put(prop.getKey(), prop);
+    }
+    assertEquals("10", q1_properties.get("capacity").getValue());
+    assertEquals("35", q1_properties.get("maxCapacity").getValue());
+    
+    // check for acls
+    assertEquals("u1", childQueues.get("p1:p12").getAcl_submit_job());
+    assertEquals("u2", childQueues.get("p1:p12").getAcl_administer_jobs());
+  }
 }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java?rev=883621&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java Tue Nov 24 08:22:38 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.security.auth.login.LoginException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import static org.apache.hadoop.mapred.DeprecatedQueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+
+public class TestQueueManagerWithDeprecatedConf extends TestCase {
+
+  static final Log LOG = LogFactory.getLog(TestQueueManagerWithDeprecatedConf.class);
+  
+
+  
+  public void testMultipleQueues() {
+    JobConf conf = new JobConf();
+    conf.set("mapred.queue.names", "q1,q2,Q3");
+    QueueManager qMgr = new QueueManager(conf);
+    Set<String> expQueues = new TreeSet<String>();
+    expQueues.add("q1");
+    expQueues.add("q2");
+    expQueues.add("Q3");
+    verifyQueues(expQueues, qMgr.getLeafQueueNames());
+  }
+
+  public void testSchedulerInfo() {
+    JobConf conf = new JobConf();
+    conf.set("mapred.queue.names", "qq1,qq2");
+    QueueManager qMgr = new QueueManager(conf);
+    qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
+    qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
+    assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
+    assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
+  }
+
+
+  public void testQueueManagerWithDeprecatedConf() throws IOException {
+    String queueConfigPath =
+      System.getProperty("test.build.extraconf", "build/test/extraconf");
+
+    File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
+    try {
+      // queue properties with which the cluster is started.
+      Properties hadoopConfProps = new Properties();
+      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
+      hadoopConfProps.put("mapred.acls.enabled", "true");
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+
+      //properties for mapred-queue-acls.xml
+      UserGroupInformation ugi =
+        new UnixUserGroupInformation("unknownUser",new String[]{" "});
+      hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
+      hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u1");
+      hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "*");
+      hadoopConfProps.put("mapred.queue.default.acl-administer-jobs", ugi.getUserName());
+      hadoopConfProps.put("mapred.queue.q1.acl-administer-jobs", "u1");
+      hadoopConfProps.put("mapred.queue.q2.acl-administer-jobs", "*");
+
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+
+      Configuration conf = new JobConf();
+      QueueManager queueManager = new QueueManager(conf);
+      //Testing access to queue.
+      assertTrue("User Job Submission failed.",
+          queueManager.hasAccess("default", Queue.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertFalse("User Job Submission failed.",
+          queueManager.hasAccess("q1", Queue.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue("User Job Submission failed.",
+          queueManager.hasAccess("q2", Queue.QueueOperation.
+              SUBMIT_JOB, ugi));
+      //Testing the admin acls
+      assertTrue("User Job Submission failed.",
+           queueManager.hasAccess("default", Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+       assertFalse("User Job Submission failed.",
+           queueManager.hasAccess("q1", Queue.QueueOperation.
+               ADMINISTER_JOBS, ugi));
+       assertTrue("User Job Submission failed.",
+           queueManager.hasAccess("q2", Queue.QueueOperation.
+               ADMINISTER_JOBS, ugi));
+
+ 
+    } finally {
+      //Cleanup the configuration files in all cases
+      if(hadoopConfigFile.exists()) {
+        hadoopConfigFile.delete();
+      }
+    }
+  }
+
+  private void verifyQueues(Set<String> expectedQueues, 
+                                          Set<String> actualQueues) {
+    assertEquals(expectedQueues.size(), actualQueues.size());
+    for (String queue : expectedQueues) {
+      assertTrue(actualQueues.contains(queue));
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java?rev=883621&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java Tue Nov 24 08:22:38 2009
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.apache.hadoop.mapred.QueueConfigurationParser.NAME_SEPARATOR;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.submitSleepJob;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.tools.MRAdmin;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+public class TestQueueManagerWithJobTracker {
+
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocumentWithAcls(doc, "true");
+    writeToFile(doc, CONFIG);
+    conf = new Configuration();
+    conf.addResource(CONFIG);
+    conf.set("mapred.committer.job.setup.cleanup.needed", "false");
+    setUpCluster(conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    new File(CONFIG).delete();
+  }
+
+  /**
+   * Test to check that jobs cannot be submitted to a queue in STOPPED state
+   * @throws Exception
+   */
+  @Test(expected = IOException.class)
+  public void testSubmitJobForStoppedQueue() throws Exception {
+    submitSleepJob(10, 10, 100, 100, false, null,
+        "p1" + NAME_SEPARATOR + "p14", conf);
+    fail("queue p1:p14 is in stopped state and should not accept jobs");
+  }
+
+  /**
+   * Test to check that jobs cannot be submitted to a container queue
+   * @throws Exception
+   */
+  @Test(expected = IOException.class)
+  public void testSubmitJobForContainerQueue() throws Exception {
+      submitSleepJob(10, 10, 100, 100, false, null, "p1", conf);
+      fail("queue p1 is a container queue and cannot have jobs");
+  }
+
+  /**
+   * Tests the submission of job with specified acls
+   * @throws Exception
+   */
+  @Test
+  public void testAclsForSubmitJob() throws Exception {
+    Job job;
+    UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login());
+    // submit job to queue p1:p13 with unspecified acls 
+    job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
+        + "p13", conf);
+    assertTrue("Job submission for u1 failed in queue : p1:p13.",
+        job.isSuccessful());
+    // check for access to submit the job
+    try {
+      job = submitSleepJob(0, 0, 0, 0, false, "u2,g1", "p1" + NAME_SEPARATOR
+          + "p11", conf);
+      fail("user u2 cannot submit jobs to queue p1:p11");
+    } catch (Exception e) {
+    }
+    // submit job to queue p1:p11 with acls-submit-job as u1
+    job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1"
+        + NAME_SEPARATOR + "p11", conf);
+    assertTrue("Job submission for u1 failed in queue : p1:p11.",
+        job.isSuccessful());
+  }
+
+  /**
+   * Tests the accessibility to kill a job
+   * @throws Exception
+   */
+  @Test
+  public void testAccessToKillJob() throws Exception {
+    Job job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1"
+        + NAME_SEPARATOR + "p11", conf);
+    UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login());
+    JobConf jobConf = miniMRCluster.createJobConf();
+    Cluster cluster = null;
+    JobID jobID = job.getStatus().getJobID();
+    //Ensure that the jobinprogress is initied before we issue a kill 
+    //signal to the job.
+    JobTracker tracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = tracker.getJob(org.apache.hadoop.mapred.JobID
+        .downgrade(jobID));
+    tracker.initJob(jip);
+    try {
+      tracker.killJob(jobID);
+      fail("current user is neither u1 nor in the administer group list");
+    } catch (Exception e) {
+      Configuration userConf = new Configuration(miniMRCluster.createJobConf());
+      userConf.set("hadoop.job.ugi", "u1,g1");
+      cluster = new Cluster(userConf);
+      cluster.getJob(jobID).killJob();
+      // kill the running job
+      assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+          cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+    }
+    
+    job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1" + NAME_SEPARATOR
+        + "p12", conf);
+    jobID = job.getStatus().getJobID();
+    //Ensure that the jobinprogress is initied before we issue a kill 
+    //signal to the job.
+    jip =  tracker.getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
+    tracker.initJob(jip);
+    tracker.killJob(job.getID());
+    // kill the job by the user who submitted the job
+    assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+        cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+    
+    Configuration userConf = new Configuration(miniMRCluster.createJobConf());
+    userConf.set("hadoop.job.ugi", "u1,g1");
+    cluster = new Cluster(userConf);
+    job = submitSleepJob(1, 1, 10, 10, false, "u1,g1", "p1" + NAME_SEPARATOR
+        + "p11", conf);
+    jobID = job.getStatus().getJobID();
+    //Ensure that the jobinprogress is initied before we issue a kill 
+    //signal to the job.
+    jip =  tracker.getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
+    tracker.initJob(jip);
+    jobConf.set("hadoop.job.ugi", "u3,g3");
+    cluster = new Cluster(jobConf);
+    // try killing job with user not in administer list
+    try {
+      cluster.getJob(jobID).killJob();
+      fail("u3 not in administer list");
+    } catch (Exception e) {
+      jobConf.set("hadoop.job.ugi", "u1,g1");
+      cluster = new Cluster(jobConf);
+      assertFalse(cluster.getJob(jobID).isComplete());
+      cluster.getJob(jobID).killJob();
+      // kill the running job
+      assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+          cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+    }
+  }
+
+  /**
+   * Tests job submission after refresh
+   * @throws Exception
+   */
+  @Test
+  public void testSubmitJobsAfterRefresh() throws Exception {
+    // test for refresh
+    checkForConfigFile();
+    Document doc = createDocument();
+    refreshDocument(doc);
+    writeToFile(doc, CONFIG);
+    MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
+    admin.run(new String[] { "-refreshQueues" });
+    try {
+      submitSleepJob(10, 10, 100, 100, false, "u1,g1", "p1"
+          + NAME_SEPARATOR + "p11", conf);
+      fail("user u1 is not in the submit jobs' list");
+    } catch (Exception e) {
+    }
+    checkForConfigFile();
+    doc = createDocument();
+    createSimpleDocumentWithAcls(doc, "true");
+    writeToFile(doc, CONFIG);
+    admin.run(new String[] { "-refreshQueues" });
+  }
+
+  private void refreshDocument(Document doc) {
+    Element queues = createQueuesNode(doc, "true");
+
+    // Create parent level queue q1.
+    Element q1 = createQueue(doc, "q1");
+    Properties props = new Properties();
+    props.setProperty("capacity", "10");
+    props.setProperty("maxCapacity", "35");
+    q1.appendChild(createProperties(doc, props));
+    queues.appendChild(q1);
+
+    // Create another parent level p1
+    Element p1 = createQueue(doc, "p1");
+
+    // append child p11 to p1
+    Element p11 = createQueue(doc, "p11");
+    p11.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, " "));
+    p11.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
+    p1.appendChild(p11);
+
+    Element p12 = createQueue(doc, "p12");
+
+    p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+    p12.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "*"));
+    p12.appendChild(createAcls(doc,
+        QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "*"));
+
+    // append p12 to p1.
+    p1.appendChild(p12);
+    // append child p13 to p1
+    Element p13 = createQueue(doc, "p13");
+    p13.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+    p1.appendChild(p13);
+    // append child p14 to p1
+    Element p14 = createQueue(doc, "p14");
+    p14.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
+    p1.appendChild(p14);
+    queues.appendChild(p1);
+  }
+
+  /** 
+   * Tests job submission when acls are disabled
+   * @throws Exception
+   */
+  @Test
+  public void testAclsDisabled() throws Exception {
+    checkForConfigFile();
+    Document doc = createDocument();
+    createSimpleDocumentWithAcls(doc, "false");
+    writeToFile(doc, CONFIG);
+    MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
+    admin.run(new String[] { "-refreshQueues" });
+
+    UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login());
+    // submit job to queue p1:p11 by any user not in acls-submit-job
+    Job job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1" + NAME_SEPARATOR
+        + "p11", conf);
+    assertTrue("Job submitted for u2 in queue p1:p11 is not successful.",
+        job.isSuccessful());
+    
+    // submit job to queue p1:p11 by user in acls-submit-job
+    job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
+        + "p11", conf);
+    assertTrue("Job submitted for u2 in queue p1:p11 is not successful.",
+        job.isSuccessful());
+
+    job = submitSleepJob(1, 1, 0, 0, false, "u1,g1", "p1" + NAME_SEPARATOR
+        + "p11", conf);
+    // kill the job by any user    
+    JobConf jobConf = miniMRCluster.createJobConf();
+    jobConf.set("hadoop.job.ugi", "u3,g3");
+    Cluster cluster = new Cluster(jobConf);
+    JobID jobID = job.getStatus().getJobID();
+    //Ensure that the jobinprogress is initied before we issue a kill 
+    //signal to the job.
+    JobInProgress jip = miniMRCluster.getJobTrackerRunner().getJobTracker()
+        .getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
+    miniMRCluster.getJobTrackerRunner().getJobTracker().initJob(jip);
+    cluster.getJob(jobID).killJob();
+    assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+        cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+  }
+}



Mime
View raw message