hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r815605 [1/3] - in /hadoop/mapreduce/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/java/org/apache/hadoo...
Date Wed, 16 Sep 2009 04:49:20 GMT
Author: yhemanth
Date: Wed Sep 16 04:49:18 2009
New Revision: 815605

URL: http://svn.apache.org/viewvc?rev=815605&view=rev
Log:
MAPREDUCE-861. Add support for hierarchical queues in the Map/Reduce framework. Contributed by Rahul Kumar Singh.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerForHierarchialQueues.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/conf/mapred-queues.xml.template
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueInfo.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java
    hadoop/mapreduce/trunk/src/test/commit-tests
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Sep 16 04:49:18 2009
@@ -96,6 +96,9 @@
     MAPREDUCE-830. Add support for splittable compression to TextInputFormats.
     (Abdul Qadeer via cdouglas)
 
+    MAPREDUCE-861. Add support for hierarchical queues in the Map/Reduce
+    framework. (Rahul Kumar Singh via yhemanth)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/conf/mapred-queues.xml.template
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/mapred-queues.xml.template?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/mapred-queues.xml.template (original)
+++ hadoop/mapreduce/trunk/conf/mapred-queues.xml.template Wed Sep 16 04:49:18 2009
@@ -1,68 +1,56 @@
 <?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!-- This is a template file for queue acls configuration properties -->
-
-<configuration>
-
-<property>
-  <name>mapred.queue.names</name>
-  <value>default</value>
-  <description> Comma separated list of queues configured for this jobtracker.
-    Jobs are added to queues and schedulers can configure different 
-    scheduling properties for the various queues. To configure a property 
-    for a queue, the name of the queue must match the name specified in this 
-    value. Queue properties that are common to all schedulers are configured 
-    here with the naming convention, mapred.queue.$QUEUE-NAME.$PROPERTY-NAME,
-    for e.g. mapred.queue.default.submit-job-acl.
-    The number of queues configured in this parameter could depend on the
-    type of scheduler being used, as specified in 
-    mapred.jobtracker.taskScheduler. For example, the JobQueueTaskScheduler
-    supports only a single queue, which is the default configured here.
-    Before adding more queues, ensure that the scheduler you've configured
-    supports multiple queues.
-  </description>
-</property>
-
-<property>
-  <name>mapred.acls.enabled</name>
-  <value>false</value>
-  <description> Specifies whether ACLs are enabled, and should be checked
-    for various operations.
-  </description>
-</property>
-
-<property>
-  <name>mapred.queue.default.acl-submit-job</name>
-  <value>*</value>
-  <description> Comma separated list of user and group names that are allowed
-    to submit jobs to the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to 
-    submit jobs. 
-  </description>
-</property>
-
-<property>
-  <name>mapred.queue.default.acl-administer-jobs</name>
-  <value>*</value>
-  <description> Comma separated list of user and group names that are allowed
-    to delete jobs or modify job's priority for jobs not owned by the current
-    user in the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to do 
-    this operation.
-  </description>
-</property>
-
-<property>
-  <name>mapred.queue.default.state</name>
-  <value>running</value>
-  <description>
-   This values defines the state , default queue is in.
-   the values can be either "stopped" or "running"
-   This value can be changed at runtime.
-  </description>
-</property>
-
-</configuration>
+<!-- This is the template for queue configuration. The format supports nesting of
+     queues within queues - a feature called hierarchical queues. All queues are
+     defined within the 'queues' tag which is the top level element for this
+     XML document.
+     The 'aclsEnabled' attribute should be set to true, if ACLs should be checked
+     on queue operations such as submitting jobs, killing jobs etc. -->
+<queues aclsEnabled="false">
+
+  <!-- Configuration for a queue is specified by defining a 'queue' element. -->
+  <queue>
+
+    <!-- Name of a queue. Queue name cannot contain a ':'  -->
+    <name>default</name>
+
+    <!-- properties for a queue, typically used by schedulers,
+    can be defined here -->
+    <properties>
+    </properties>
+
+	<!-- State of the queue. If running, the queue will accept new jobs.
+         If stopped, the queue will not accept new jobs. -->
+    <state>running</state>
+
+    <!-- Specifies the ACLs to check for submitting jobs to this queue.
+         If set to '*', it allows all users to submit jobs to the queue.
+         For specifying a list of users and groups the format to use is
+         user1,user2 group1,group2 -->
+    <acl-submit-job>*</acl-submit-job>
+
+    <!-- Specifies the ACLs to check for modifying jobs in this queue.
+         Modifications include killing jobs, tasks of jobs or changing
+         priorities.
+         If set to '*', it allows all users to submit jobs to the queue.
+         For specifying a list of users and groups the format to use is
+         user1,user2 group1,group2 -->
+    <acl-administer-jobs>*</acl-administer-jobs>
+  </queue>
+
+  <!-- Here is a sample of a hierarchical queue configuration
+       where q2 is a child of q1. In this example, q2 is a leaf level
+       queue as it has no queues configured within it. Currently, ACLs
+       and state are only supported for the leaf level queues.
+       Note also the usage of properties for the queue q2.
+  <queue>
+    <name>q1</name>
+    <queue>
+      <name>q2</name>
+      <properties>
+        <property key="capacity" value="20"/>
+        <property key="user-limit" value="30"/>
+      </properties>
+    </queue>
+  </queue>
+ -->
+</queues>

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Wed Sep 16 04:49:18 2009
@@ -17,13 +17,14 @@
 
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
 
 /**
  * Class providing access to Capacity scheduler configuration.
@@ -51,6 +52,9 @@
   
   private static final String SUBQUEUE_SUFFIX="subQueues";
 
+  private Map<String, Properties> queueProperties
+    = new HashMap<String,Properties>();
+
   /**
    * If {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} is set to
    * {@link JobConf#DISABLED_MEMORY_LIMIT}, this configuration will be used to
@@ -156,6 +160,10 @@
         "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
         2);
   }
+
+  void setProperties(String queueName , Properties properties) {
+    this.queueProperties.put(queueName,properties);
+  }
   
   /**
    * Get the percentage of the cluster for the specified queue.
@@ -178,21 +186,31 @@
     //In case of both capacity and default capacity not configured.
     //Last check is if the configuration is specified and is marked as
     //negative we throw exception
-    String raw = getCSConf().getRaw(toFullPropertyName(queue,
-        "capacity"));
-    if(raw == null) {
-      return -1;
-    }
-    float result = getCSConf().getFloat(toFullPropertyName(queue,
-                                   "capacity"), 
-                                   -1);
-    if (result < 0.0 || result > 100.0) {
+    String raw = getProperty(queue,"capacity");
+
+
+    float result = this.getFloat(raw,-1);
+    
+    if (result > 100.0) {
       throw new IllegalArgumentException("Illegal capacity for queue " + queue +
                                          " of " + result);
     }
     return result;
   }
 
+  String getProperty(String queue,String property) {
+    if(!queueProperties.containsKey(queue))
+     throw new IllegalArgumentException("Invalid queuename " + queue);
+
+    //This check is still required as sometimes we create queue with null
+    //This is typically happens in case of test.
+    if(queueProperties.get(queue) != null) {
+      return queueProperties.get(queue).getProperty(property);
+    }
+
+    return null;
+  }
+
   /**
    * Get maximum percentage stretch for a queue.
    * This percentage defines a limit beyond which a
@@ -207,13 +225,8 @@
    * @return
    */
   public float getMaxCapacity(String queue) {
-    String raw = getCSConf().getRaw(toFullPropertyName(queue, MAX_CAPACITY));
-    if(raw == null) {
-      return -1;
-    }
-    float result = getCSConf().getFloat(toFullPropertyName(queue,
-                                                           MAX_CAPACITY),
-                                   -1);
+    String raw = getProperty(queue,MAX_CAPACITY);
+    float result = getFloat(raw,-1);
     result = (result <= 0) ? -1 : result; 
     if (result > 100.0) {
       throw new IllegalArgumentException("Illegal maximum-capacity-stretch " +
@@ -228,48 +241,6 @@
   }
 
   /**
-   *
-   * @param queue . Complete qualified name of the queue
-   * All the subQueues at same level should have unique names.
-   * 
-   * @return list of subQueues at a level.
-   * @return null if no subQueues specified.
-   */
-  public Set<String> getSubQueues(String queue){
-    String[] result = getCSConf().getStrings(
-      toFullPropertyName(
-        queue,SUBQUEUE_SUFFIX));
-    if(result != null && result.length  > 0){
-      HashSet<String> qs = new HashSet<String>(result.length);
-      for (String q: result){
-        qs.add(q);
-      }
-      return qs;
-    }
-    return null;
-  }
-  
-  /**
-   * Sets the capacity of the given queue.
-   * 
-   * @param queue name of the queue
-   * @param capacity percent of the cluster for the queue.
-   */
-  public void setMaxCapacity(String queue,float capacity) {
-    getCSConf().setFloat(toFullPropertyName(queue, MAX_CAPACITY),capacity);
-  }
-
-  /**
-   * Sets the capacity of the given queue.
-   *
-   * @param queue name of the queue
-   * @param capacity percent of the cluster for the queue.
-   */
-  public void setCapacity(String queue,float capacity) {
-    getCSConf().setFloat(toFullPropertyName(queue, "capacity"),capacity);
-  }
-  
-  /**
    * Get whether priority is supported for this queue.
    * 
    * If this value is false, then job priorities will be ignored in 
@@ -279,23 +250,10 @@
    * @return Whether this queue supports priority or not.
    */
   public boolean isPrioritySupported(String queue) {
-    checkIfJobQueue(queue);
-    return getCSConf().getBoolean(toFullPropertyName(queue, "supports-priority"),
-        defaultSupportPriority);  
-  }
-  
-  /**
-   * Set whether priority is supported for this queue.
-   * 
-   * 
-   * @param queue name of the queue
-   * @param value true, if the queue must support priorities, false otherwise.
-   */
-  public void setPrioritySupported(String queue, boolean value) {
-    checkIfJobQueue(queue);
-    getCSConf().setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+    String raw = getProperty(queue,"supports-priority");
+    return Boolean.parseBoolean(raw);
   }
-  
+
   /**
    * Get the minimum limit of resources for any user submitting jobs in 
    * this queue, in percentage.
@@ -312,38 +270,15 @@
    * 
    */
   public int getMinimumUserLimitPercent(String queue) {
-    checkIfJobQueue(queue);
-    int userLimit = getCSConf().getInt(toFullPropertyName(queue,
-        "minimum-user-limit-percent"), defaultUlimitMinimum);
+    String raw = getProperty(queue,
+        "minimum-user-limit-percent");
+    int userLimit = getInt(raw,defaultUlimitMinimum);
     if(userLimit <= 0 || userLimit > 100) {
       throw new IllegalArgumentException("Invalid user limit : "
           + userLimit + " for queue : " + queue);
     }
     return userLimit;
   }
-
-  private void checkIfJobQueue(String queue) {
-    Set<String> queues = getSubQueues(queue);
-    if(queues == null || queues.isEmpty()) {
-      return;
-    } else {
-      LOG.warn("This property is " +
-        "only allowed for child queue not for container Queue " + queue);
-    }
-  }
-
-  /**
-   * Set the minimum limit of resources for any user submitting jobs in
-   * this queue, in percentage.
-   * 
-   * @param queue name of the queue
-   * @param value minimum limit of resources for any user submitting jobs
-   * in this queue
-   */
-  public void setMinimumUserLimitPercent(String queue, int value) {
-    getCSConf().setInt(toFullPropertyName(queue, "minimum-user-limit-percent"),
-                    value);
-  }
   
   /**
    * Reload configuration by clearing the information read from the 
@@ -369,28 +304,15 @@
    * or zero.
    */
   public int getMaxJobsPerUserToInitialize(String queue) {
-    checkIfJobQueue(queue);
-    int maxJobsPerUser = getCSConf().getInt(toFullPropertyName(queue,
-        "maximum-initialized-jobs-per-user"), 
-        defaultMaxJobsPerUsersToInitialize);
+    String raw = getProperty(queue,"maximum-initialized-jobs-per-user");
+    int maxJobsPerUser = getInt(raw,defaultMaxJobsPerUsersToInitialize);
     if(maxJobsPerUser <= 0) {
       throw new IllegalArgumentException(
           "Invalid maximum jobs per user configuration " + maxJobsPerUser);
     }
     return maxJobsPerUser;
   }
-  
-  /**
-   * Sets the maximum number of jobs which are allowed to be initialized 
-   * for a user in the queue.
-   * 
-   * @param queue queue name.
-   * @param value maximum number of jobs allowed to be initialized per user.
-   */
-  public void setMaxJobsPerUserToInitialize(String queue, int value) {
-    getCSConf().setInt(toFullPropertyName(queue,
-        "maximum-initialized-jobs-per-user"), value);
-  }
+
 
   /**
    * Amount of time in milliseconds which poller thread and initialization
@@ -442,28 +364,6 @@
     }
     return maxWorkerThreads;
   }
-  /**
-   * Set the sleep interval which initialization poller would sleep before 
-   * it looks at the jobs in the job queue.
-   * 
-   * @param interval sleep interval
-   */
-  public void setSleepInterval(long interval) {
-    getCSConf().setLong(
-        "mapred.capacity-scheduler.init-poll-interval", interval);
-  }
-  
-  /**
-   * Sets number of threads which can be spawned to initialize jobs in
-   * parallel.
-   * 
-   * @param poolSize number of threads to be spawned to initialize jobs
-   * in parallel.
-   */
-  public void setMaxWorkerThreads(int poolSize) {
-    getCSConf().setInt(
-        "mapred.capacity-scheduler.init-worker-threads", poolSize);
-  }
 
   /**
    * get the max map slots cap
@@ -471,19 +371,10 @@
    * @return
    */
   public int getMaxMapCap(String queue) {
-    checkIfJobQueue(queue);
-    return getCSConf().getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
+    String raw = getProperty(queue,MAX_MAP_CAP_PROPERTY);
+    return getInt(raw,-1);
   }
 
-  /**
-   * Used for testing
-   * @param queue
-   * @param val
-   */
-  public void setMaxMapCap(String queue,int val) {
-    checkIfJobQueue(queue);
-    getCSConf().setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
-  }
 
   /**
    * get the max reduce slots cap
@@ -491,21 +382,32 @@
    * @return
    */
   public int getMaxReduceCap(String queue) {
-    checkIfJobQueue(queue);
-    return getCSConf().getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);
+    String raw = getProperty(queue,MAX_REDUCE_CAP_PROPERTY);
+    return getInt(raw,-1);
   }
 
-  /**
-   * Used for testing
-   * @param queue
-   * @param val
-   */
-  public void setMaxReduceCap(String queue,int val) {
-    checkIfJobQueue(queue);
-    getCSConf().setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
-  }
 
   public Configuration getCSConf() {
     return rmConf;
   }
+
+  float getFloat(String valueString,float defaultValue) {
+    if (valueString == null)
+      return defaultValue;
+    try {
+      return Float.parseFloat(valueString);
+    } catch (NumberFormatException e) {
+      return defaultValue;
+    }
+  }
+
+  int getInt(String valueString,int defaultValue) {
+    if (valueString == null)
+      return defaultValue;
+    try {
+      return Integer.parseInt(valueString);
+    } catch (NumberFormatException e) {
+      return defaultValue;
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Sep 16 04:49:18 2009
@@ -25,6 +25,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -700,8 +702,8 @@
     QueueManager queueManager = taskTrackerManager.getQueueManager();
 
     //get parent level queues. these are defined in mapred-*.xml
-    Set<String> queues = queueManager.getQueues();
-
+    List<JobQueueInfo> queues = Arrays.asList(queueManager.getRootQueues());
+    LOG.info("Root queues defined " + queues);
     // Sanity check: there should be at least one queue.
     if (0 == queues.size()) {
       throw new IllegalStateException("System has no queue configured");
@@ -735,7 +737,8 @@
    * construction of the hierarchy.
    */
   private void updateQueueMaps() {
-    List<AbstractQueue> allQueues = getRoot().getDescendentJobQueues();
+    Set<AbstractQueue> allQueues = getContainerQueues(getRoot());
+    allQueues.addAll(getRoot().getDescendentJobQueues());
     for (AbstractQueue queue : allQueues) {
       if (queue instanceof JobQueue) {
         jobQueuesManager.addQueue((JobQueue)queue);
@@ -745,6 +748,26 @@
     }
   }
 
+  /**
+   * returns list of container queues.
+   * @param q
+   * @return the containerQueue for queue q
+   */
+  private Set<AbstractQueue> getContainerQueues(AbstractQueue q) {
+    Set<AbstractQueue> l = new HashSet<AbstractQueue>();
+
+    if(!(q instanceof ContainerQueue)) {
+      return l;
+    }
+    //Add q's children.
+    for (AbstractQueue child : q.getChildren()) {
+      if(child.getChildren() != null && child.getChildren().size() > 0) {
+        l.add(child);
+        l.addAll(getContainerQueues(child));
+      }
+    }
+    return l;
+  }
   /** mostly for testing purposes */
   void setInitializationPoller(JobInitializationPoller p) {
     this.initializationPoller = p;
@@ -957,4 +980,4 @@
     this.root = rt;
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java Wed Sep 16 04:49:18 2009
@@ -19,11 +19,18 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
 
-import java.util.*;
 import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.TaskType;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 /**
  *
@@ -42,11 +49,11 @@
     }
     waitingJobs =
       new
-        TreeMap<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>(
+        TreeMap<JobSchedulingInfo, JobInProgress>(
         comparator);
     runningJobs =               
       new
-        TreeMap<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>(
+        TreeMap<JobSchedulingInfo, JobInProgress>(
         comparator);
   }
 
@@ -58,16 +65,16 @@
   * If a queue doesn't support priorities, jobs are
   * sorted based on their start time.
   */
-  static final Comparator<JobQueueJobInProgressListener.JobSchedulingInfo>
+  static final Comparator<JobSchedulingInfo>
     STARTTIME_JOB_COMPARATOR;
 
   static {
     STARTTIME_JOB_COMPARATOR =
-      new Comparator<JobQueueJobInProgressListener.JobSchedulingInfo>() {
+      new Comparator<JobSchedulingInfo>() {
         // comparator for jobs in queues that don't support priorities
         public int compare(
-          JobQueueJobInProgressListener.JobSchedulingInfo o1,
-          JobQueueJobInProgressListener.JobSchedulingInfo o2) {
+          JobSchedulingInfo o1,
+          JobSchedulingInfo o2) {
           // the job that started earlier wins
           if (o1.getStartTime() < o2.getStartTime()) {
             return -1;
@@ -178,12 +185,12 @@
   }
 
 
-  Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>
+  Map<JobSchedulingInfo, JobInProgress>
     waitingJobs; // for waiting jobs
-  Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>
+  Map<JobSchedulingInfo, JobInProgress>
     runningJobs; // for running jobs
 
-  public Comparator<JobQueueJobInProgressListener.JobSchedulingInfo>
+  public Comparator<JobSchedulingInfo>
     comparator;
 
   Collection<JobInProgress> getWaitingJobs() {
@@ -203,20 +210,20 @@
   private void addRunningJob(JobInProgress job) {
     synchronized (runningJobs) {
       runningJobs.put(
-        new JobQueueJobInProgressListener.JobSchedulingInfo(
+        new JobSchedulingInfo(
           job), job);
     }
   }
 
   private JobInProgress removeRunningJob(
-    JobQueueJobInProgressListener.JobSchedulingInfo jobInfo) {
+    JobSchedulingInfo jobInfo) {
     synchronized (runningJobs) {
       return runningJobs.remove(jobInfo);
     }
   }
 
   JobInProgress removeWaitingJob(
-    JobQueueJobInProgressListener.JobSchedulingInfo schedInfo) {
+    JobSchedulingInfo schedInfo) {
     synchronized (waitingJobs) {
       JobInProgress jip = waitingJobs.remove(schedInfo);
       this.qsc.setNumOfWaitingJobs(waitingJobs.size());
@@ -227,7 +234,7 @@
   private void addWaitingJob(JobInProgress job) {
     synchronized (waitingJobs) {
       waitingJobs.put(
-        new JobQueueJobInProgressListener.JobSchedulingInfo(
+        new JobSchedulingInfo(
           job), job);
       this.qsc.setNumOfWaitingJobs(waitingJobs.size());
     }
@@ -321,7 +328,7 @@
   // This is used to reposition a job in the queue. A job can get repositioned
   // because of the change in the job priority or job start-time.
   private void reorderJobs(
-    JobInProgress job, JobQueueJobInProgressListener.JobSchedulingInfo oldInfo
+    JobInProgress job, JobSchedulingInfo oldInfo
   ) {
 
     if (removeWaitingJob(oldInfo) != null) {
@@ -372,8 +379,8 @@
   // Update the scheduler as job's state has changed
   private void jobStateChanged(JobStatusChangeEvent event) {
     JobInProgress job = event.getJobInProgress();
-    JobQueueJobInProgressListener.JobSchedulingInfo oldJobStateInfo =
-      new JobQueueJobInProgressListener.JobSchedulingInfo(event.getOldStatus());
+    JobSchedulingInfo oldJobStateInfo =
+      new JobSchedulingInfo(event.getOldStatus());
     // Check if the ordering of the job has changed
     // For now priority and start-time can change the job ordering
     if (event.getEventType() == JobStatusChangeEvent.EventType.PRIORITY_CHANGED
@@ -403,7 +410,7 @@
   * job queue manager.
   */
   private void jobCompleted(
-    JobInProgress job, JobQueueJobInProgressListener.JobSchedulingInfo oldInfo
+    JobInProgress job, JobSchedulingInfo oldInfo
   ) {
     LOG.info(
       "Job " + job.getJobID().toString() + " submitted to queue "

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Wed Sep 16 04:49:18 2009
@@ -18,7 +18,10 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java Wed Sep 16 04:49:18 2009
@@ -21,7 +21,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.Set;
+import java.util.List;
+import java.util.Properties;
 
 /**
  * Hierarchy builder for the CapacityScheduler.
@@ -30,45 +31,37 @@
 public class QueueHierarchyBuilder {
 
   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
-  private final String NAME_SEPERATOR = ".";
   private CapacitySchedulerConf schedConf;
   
   QueueHierarchyBuilder(CapacitySchedulerConf schedConf) {
     this.schedConf = schedConf;
   }
   
+
   /**
    * The first call would expect that parent has children.
    * @param parent       parent Queue
    * @param children     children
    */
   void createHierarchy(
-    AbstractQueue parent, Set<String> children) {
+    AbstractQueue parent, List<JobQueueInfo> children) {
     //check if children have further childrens.
     if (children != null && !children.isEmpty()) {
       float totalCapacity = 0.0f;
-      for (String qName : children) {
-        if(qName.contains(NAME_SEPERATOR)) {
-          throw new IllegalArgumentException( NAME_SEPERATOR  +  "" +
-            " not allowed in queue name \'" + qName + "\'.");
-        }
-        //generate fully qualified name.
-        if (!parent.getName().equals("")) {
-          qName = parent.getName() + NAME_SEPERATOR + qName;
-        }
+      for (JobQueueInfo qs : children) {
+
         //Check if this child has any more children.
-        Set<String> childQueues = schedConf.getSubQueues(qName);
+        List<JobQueueInfo> childQueues = qs.getChildren();
 
         if (childQueues != null && childQueues.size() > 0) {
           //generate a new ContainerQueue and recursively
           //create hierarchy.
           AbstractQueue cq = new ContainerQueue(
             parent,
-            loadContext(
-              qName));
+            loadContext(qs.getProperties() , qs.getQueueName()));
           //update totalCapacity
           totalCapacity += cq.qsc.getCapacityPercent();
-          LOG.info("Created a ContainerQueue " + qName);
+          LOG.info("Created a ContainerQueue " + qs.getQueueName());
           //create child hiearchy
           createHierarchy(cq, childQueues);
         } else {
@@ -77,10 +70,9 @@
           //create a JobQueue.
           AbstractQueue jq = new JobQueue(
             parent,
-            loadContext(
-              qName));
+            loadContext(qs.getProperties(),qs.getQueueName()));
           totalCapacity += jq.qsc.getCapacityPercent();
-          LOG.info("Created a jobQueue " + qName);
+          LOG.info("Created a jobQueue " + qs.getQueueName());
         }
       }
 
@@ -98,7 +90,9 @@
 
 
   private QueueSchedulingContext loadContext(
+    Properties props,
     String queueName) {
+    schedConf.setProperties(queueName,props);
     float capacity = schedConf.getCapacity(queueName);
     float stretchCapacity = schedConf.getMaxCapacity(queueName);
     if (capacity == -1.0) {

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Wed Sep 16 04:49:18 2009
@@ -26,9 +26,13 @@
 import java.io.IOException;
 
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 
 public class CapacityTestUtils {
+  static final Log LOG =
+    LogFactory.getLog(org.apache.hadoop.mapred.CapacityTestUtils.class);
 
 
   /**
@@ -450,8 +454,8 @@
     private static final AccessControlList allEnabledAcl =
       new AccessControlList("*");
 
-    FakeQueueManager() {
-      super(new Configuration());
+    FakeQueueManager(Configuration conf) {
+      super(conf);
     }
 
     void setQueues(Set<String> queueNames) {
@@ -474,7 +478,7 @@
       super.setQueues(queues);
     }
 
-    public synchronized Set<String> getQueues() {
+    public synchronized Set<String> getLeafQueueNames() {
       return queueNames;
     }
   }
@@ -487,7 +491,8 @@
     long ttExpiryInterval = 10 * 60 * 1000L; // default interval
     List<JobInProgressListener> listeners =
       new ArrayList<JobInProgressListener>();
-    FakeQueueManager qm = new FakeQueueManager();
+
+    FakeQueueManager qm = null;
 
     private Map<String, TaskTracker> trackers =
       new HashMap<String, TaskTracker>();
@@ -503,6 +508,9 @@
     public FakeTaskTrackerManager(
       int numTaskTrackers,
       int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
+    Configuration cfn = new Configuration();
+    cfn.set("mapred.queue.names","default");
+    qm = new FakeQueueManager(cfn);
       this.maxMapTasksPerTracker = maxMapTasksPerTracker;
       this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
       for (int i = 1; i < numTaskTrackers + 1; i++) {
@@ -666,7 +674,7 @@
     }
 
     public void finishTask(
-      String taskTrackerName, String tipId,
+      String tipId,
       FakeJobInProgress j) {
       TaskStatus status = taskStatuses.get(tipId);
       if (status.getIsMap()) {
@@ -732,6 +740,7 @@
     void addQueues(String[] arr) {
       Set<String> queues = new HashSet<String>();
       for (String s : arr) {
+
         queues.add(s);
       }
       qm.setQueues(queues);
@@ -770,8 +779,13 @@
     String firstQueue;
 
 
-    void setFakeQueues(List<FakeQueueInfo> queues) {
+    void setFakeQueues(List<FakeQueueInfo> queues, QueueManager qManager) {
+      Properties p = new Properties();
       for (FakeQueueInfo q : queues) {
+        p.setProperty("capacity",q.capacity+"");
+        p.setProperty("supports-priority",q.supportsPrio+"");
+        p.setProperty("minimum-user-limit-percent",q.ulMin+"");
+        qManager.getQueue(q.queueName).setProperties(p);
         queueMap.put(q.queueName, q);
       }
       firstQueue = new String(queues.get(0).queueName);
@@ -786,18 +800,47 @@
     }*/
 
     public float getCapacity(String queue) {
-      if (queueMap.get(queue) == null) {
-      }
       if (queueMap.get(queue).capacity == -1) {
-        return super.getCapacity(queue);
+        return -1;
       }
       return queueMap.get(queue).capacity;
     }
 
+    public float getMaxCapacity(String queue) {
+      //There is no support for the old testcase for
+      //maxCapacity.
+
+      //MaxCapacity testcases are part of TestContainerQueue
+      return -1;
+    }
+
+    public int getMaxMapCap(String queue) {
+      return -1;
+    }
+
+    public int getMaxReduceCap(String queue) {
+      return -1;
+    }
+
+
     public int getMinimumUserLimitPercent(String queue) {
       return queueMap.get(queue).ulMin;
     }
 
+    /**
+     * Gets the maximum number of jobs which are allowed to initialize in the
+     * job queue.
+     *
+     * @param queue queue name.
+     * @return maximum number of jobs allowed to be initialized per user.
+     * @throws IllegalArgumentException if maximum number of users is negative
+     *                                  or zero.
+     */
+    @Override
+    public int getMaxJobsPerUserToInitialize(String queue) {
+      return 2;
+    }
+
     public boolean isPrioritySupported(String queue) {
       return queueMap.get(queue).supportsPrio;
     }

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Sep 16 04:49:18 2009
@@ -64,6 +64,7 @@
 
     conf = new JobConf();
     // Don't let the JobInitializationPoller come in our way.
+    conf.set("mapred.queue.names","default");
     resConf = new FakeResourceManagerConf();
     controlledInitializationPoller = new ControlledInitializationPoller(
       scheduler.jobQueuesManager,
@@ -135,12 +136,15 @@
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default", 2);
-    resConf.setMaxReduceCap("default", -1);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2);
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(-1);
+
+
     //submit the Job
     FakeJobInProgress fjob1 =
       submitJob(JobStatus.PREP, 3, 1, "default", "user");
@@ -160,7 +164,7 @@
     //Now complete the task 1.
     // complete the job
     taskTrackerManager.finishTask(
-      "tt1", task1.get(0).getTaskID().toString(),
+      task1.get(0).getTaskID().toString(),
       fjob1);
     //We have completed the tt1 task which was a map task so we expect one map
     //task to be picked up
@@ -179,11 +183,13 @@
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default", -1);
-    resConf.setMaxReduceCap("default", 2);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(-1);
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2);
+
 
     //submit the Job
     FakeJobInProgress fjob1 =
@@ -201,7 +207,7 @@
     //Now complete the task 1 i.e map task.
     // complete the job
     taskTrackerManager.finishTask(
-      "tt1", task1.get(0).getTaskID().toString(),
+      task1.get(0).getTaskID().toString(),
       fjob1);
 
     //This should still fail as only map task is done
@@ -210,7 +216,7 @@
 
     //Complete the reduce task
     taskTrackerManager.finishTask(
-      "tt2", task2.get(0).getTaskID().toString(), fjob1);
+      task2.get(0).getTaskID().toString(), fjob1);
 
     //One reduce is done hence assign the new reduce.
     checkAssignment(
@@ -224,7 +230,8 @@
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -265,7 +272,7 @@
 
     // complete the job
     taskTrackerManager.finishTask(
-      "tt1", tasks.get(0).getTaskID().toString(),
+      tasks.get(0).getTaskID().toString(),
       fjob1);
 
     // mark the job as complete
@@ -361,7 +368,8 @@
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -382,8 +390,8 @@
       "attempt_test_0001_m_000002_0 on tt1");
 
     // complete tasks
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
 
     // II. Check multiple assignments with running tasks across jobs
     // ask for a task from first job
@@ -397,8 +405,8 @@
       "attempt_test_0002_m_000001_0 on tt1");
 
     // complete tasks
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
 
     // III. Check multiple assignments with completed tasks across jobs
     // ask for a task from the second job
@@ -407,7 +415,7 @@
       "attempt_test_0002_m_000002_0 on tt1");
 
     // complete task
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j2);
 
     // IV. Check assignment with completed job
     // finish first job
@@ -420,7 +428,7 @@
       "attempt_test_0002_m_000003_0 on tt1");
 
     // complete task
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000003_0", j2);
   }
 
   // basic tests, should be able to submit to queues
@@ -431,7 +439,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -458,7 +467,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
@@ -494,7 +504,8 @@
     queues.add(new FakeQueueInfo("qAZ2", -1.0f, true, 25));
     queues.add(new FakeQueueInfo("qAZ3", -1.0f, true, 25));
     queues.add(new FakeQueueInfo("qAZ4", -1.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     JobQueuesManager jqm = scheduler.jobQueuesManager;
@@ -515,7 +526,8 @@
     // the cluster capacity increase slowly.
     queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -567,7 +579,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -609,8 +622,8 @@
     taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("defaultXYZM", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("defaultXYZM", 2);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
@@ -626,6 +639,9 @@
       JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+      .getMapTSC().setMaxTaskLimit(2);
+
 
     // The situation :  Submit 2 jobs with high memory map task
     //Set the max limit for queue to 2 ,
@@ -693,11 +709,13 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default", 2);
-    resConf.setMaxReduceCap("default", 2);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2);
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2);
+
 
     // submit a job
     FakeJobInProgress fjob1 =
@@ -731,7 +749,7 @@
       "attempt_test_0002_r_000001_0 on tt4");
 
     taskTrackerManager.finishTask(
-      "tt1", t1.getTaskID().toString(),
+      t1.getTaskID().toString(),
       fjob1);
 
     //tt1 completed the task so we have 1 map slot for u1
@@ -741,7 +759,7 @@
       "attempt_test_0001_m_000002_0 on tt1");
 
     taskTrackerManager.finishTask(
-      "tt4", t4.getTaskID().toString(),
+      t4.getTaskID().toString(),
       fjob2);
     //tt4 completed the task , so we have 1 reduce slot for u2
     //we are assigning the 2nd reduce from fjob2
@@ -760,7 +778,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -796,7 +815,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -832,7 +852,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -857,23 +878,23 @@
     // Submit another job, from a different user
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // one of the task finishes
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
     // Now if I ask for a map task, it should come from the second job 
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0002_m_000001_0 on tt1");
     // another task from job1 finishes, another new task to job2
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0002_m_000002_0 on tt1");
     // now we have equal number of tasks from each job. Whichever job's
     // task finishes, that job gets a new task
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
     checkAssignment(
       taskTrackerManager, scheduler, "tt2",
       "attempt_test_0001_m_000005_0 on tt2");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0002_m_000003_0 on tt1");
@@ -886,7 +907,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     // add some more TTs 
@@ -932,7 +954,7 @@
       taskTrackerManager, scheduler, "tt5",
       "attempt_test_0001_m_000006_0 on tt5");
     // u1 finishes a task
-    taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1);
     // u1 submits a few more jobs 
     // All the jobs are inited when submitted
     // because of addition of Eager Job Initializer all jobs in this
@@ -950,12 +972,12 @@
       taskTrackerManager, scheduler, "tt5",
       "attempt_test_0007_m_000001_0 on tt5");
     // some other task finishes and u3 gets it
-    taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1);
     checkAssignment(
       taskTrackerManager, scheduler, "tt5",
       "attempt_test_0007_m_000002_0 on tt5");
     // now, u2 finishes a task
-    taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1);
     // next slot will go to u1, since u3 has nothing to run and u1's job is 
     // first in the queue
     checkAssignment(
@@ -977,7 +999,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
@@ -1109,7 +1132,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1123,6 +1147,7 @@
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     String schedulingInfo2 =
       queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+
     String[] infoStrings = schedulingInfo.split("\n");
     assertEquals(infoStrings.length, 18);
     assertEquals(infoStrings[0], "Queue configuration");
@@ -1147,6 +1172,7 @@
     assertEquals(infoStrings[15], "Job info");
     assertEquals(infoStrings[16], "Number of Waiting Jobs: 0");
     assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0");
+
     assertEquals(schedulingInfo, schedulingInfo2);
 
     //Testing with actual job submission.
@@ -1228,8 +1254,8 @@
 
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
-    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
-    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
+    taskTrackerManager.finishTask(t1.getTaskID().toString(), u1j1);
+    taskTrackerManager.finishTask(t2.getTaskID().toString(), u1j1);
     taskTrackerManager.finalizeJob(u1j1);
 
     // make sure we update our stats
@@ -1349,7 +1375,8 @@
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // memory-based scheduling disabled by default.
@@ -1393,7 +1420,8 @@
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
@@ -1474,7 +1502,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     taskTrackerManager.addQueues(new String[]{"default"});
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
@@ -1624,7 +1653,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     taskTrackerManager.addQueues(new String[]{"default"});
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
@@ -1711,8 +1741,8 @@
     assertNull(scheduler.assignTasks(tracker("tt1")));
 
     // finish the tasks on the tracker.
-    taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
-    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+    taskTrackerManager.finishTask(t.getTaskID().toString(), job1);
+    taskTrackerManager.finishTask(t1.getTaskID().toString(), job1);
 
     // now a new task can be assigned.
     t = checkAssignment(
@@ -1749,7 +1779,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1831,16 +1862,16 @@
       taskTrackerManager, scheduler, "tt2",
       "attempt_test_0002_r_000001_0 on tt2");
     taskTrackerManager.finishTask(
-      "tt1", t1.getTaskID().toString(), u1Jobs.get(
+      t1.getTaskID().toString(), u1Jobs.get(
         0));
     taskTrackerManager.finishTask(
-      "tt1", t2.getTaskID().toString(), u1Jobs.get(
+      t2.getTaskID().toString(), u1Jobs.get(
         0));
     taskTrackerManager.finishTask(
-      "tt2", t3.getTaskID().toString(), u1Jobs.get(
+      t3.getTaskID().toString(), u1Jobs.get(
         1));
     taskTrackerManager.finishTask(
-      "tt2", t4.getTaskID().toString(), u1Jobs.get(
+      t4.getTaskID().toString(), u1Jobs.get(
         1));
 
     // as some jobs have running tasks, the poller will now
@@ -1871,10 +1902,10 @@
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0003_r_000001_0 on tt1");
     taskTrackerManager.finishTask(
-      "tt1", t1.getTaskID().toString(), u1Jobs.get(
+      t1.getTaskID().toString(), u1Jobs.get(
         2));
     taskTrackerManager.finishTask(
-      "tt1", t2.getTaskID().toString(), u1Jobs.get(
+      t2.getTaskID().toString(), u1Jobs.get(
         2));
 
     // no new jobs should be picked up, because max user limit
@@ -1891,10 +1922,10 @@
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0004_r_000001_0 on tt1");
     taskTrackerManager.finishTask(
-      "tt1", t1.getTaskID().toString(), u1Jobs.get(
+      t1.getTaskID().toString(), u1Jobs.get(
         3));
     taskTrackerManager.finishTask(
-      "tt1", t2.getTaskID().toString(), u1Jobs.get(
+      t2.getTaskID().toString(), u1Jobs.get(
         3));
 
     // Now initialised jobs should contain user 4's job, as
@@ -1916,7 +1947,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1967,7 +1999,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1992,7 +2025,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("q1", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     //Start the scheduler.
     scheduler.start();
@@ -2014,7 +2048,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -2068,7 +2103,8 @@
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -2112,10 +2148,10 @@
       taskTrackerManager, scheduler, "tt2",
       "attempt_test_0001_r_000001_1 on tt2");
 
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", fjob1);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", fjob1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000001_1", fjob1);
+    taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", fjob1);
+    taskTrackerManager.finishTask("attempt_test_0001_r_000001_1", fjob1);
     taskTrackerManager.finalizeJob(fjob1);
 
     checkAssignment(
@@ -2124,8 +2160,8 @@
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0002_r_000001_0 on tt1");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", fjob2);
+    taskTrackerManager.finishTask("attempt_test_0002_r_000001_0", fjob2);
     taskTrackerManager.finalizeJob(fjob2);
   }
 
@@ -2143,7 +2179,8 @@
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
@@ -2280,7 +2317,8 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 100));
     queues.add(new FakeQueueInfo("q1", 50.0f, true, 100));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
@@ -2458,8 +2496,8 @@
       mgr.getJobQueue("default").getWaitingJobs().contains(job));
 
     // complete tasks and job
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", job);
+    taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", job);
     taskTrackerManager.finalizeJob(job);
 
     // make sure it is removed from the run queue
@@ -2695,7 +2733,8 @@
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
+    resConf.setFakeQueues(queues, taskTrackerManager.getQueueManager()
+    );
     JobConf conf = (JobConf) (scheduler.getConf());
     conf.set(
       JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, String.valueOf(

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Wed Sep 16 04:49:18 2009
@@ -22,10 +22,7 @@
 import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Properties;
 
 import junit.framework.TestCase;
 
@@ -36,7 +33,7 @@
   private static String testDataDir = System.getProperty("test.build.data");
   private static String testConfFile;
   
-  private Map<String, String> defaultProperties;
+  //private Map<String, String> defaultProperties;
   private CapacitySchedulerConf testConf;
   private PrintWriter writer;
   
@@ -50,16 +47,6 @@
   }
   
   public TestCapacitySchedulerConf() {
-    defaultProperties = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent",
-                       "maximum-initialized-jobs-per-user"}, 
-        new String[] { "100", 
-                        "false", 
-                        "100",
-                        "2" }
-                      );
   }
 
   
@@ -73,263 +60,8 @@
       confFile.delete();  
     }
   }
-  
-  public void testDefaults() {
-    testConf = new CapacitySchedulerConf();
-    Map<String, Map<String, String>> queueDetails
-                            = new HashMap<String, Map<String,String>>();
-    queueDetails.put("default", defaultProperties);
-    checkQueueProperties(testConf, queueDetails);
-  }
-  
-  public void testQueues() {
-
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent",
-                       "maximum-initialized-jobs-per-user"}, 
-        new String[] { "10", 
-                        "true",
-                        "25",
-                        "4"}
-                      );
-
-    Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent",
-                       "maximum-initialized-jobs-per-user"}, 
-        new String[] { "100", 
-                        "false", 
-                        "50",
-                        "1"}
-                      );
-
-    startConfig();
-    writeQueueDetails("default", q1Props);
-    writeQueueDetails("research", q2Props);
-    endConfig();
-
-    testConf = new CapacitySchedulerConf(new Path(testConfFile));
-
-    Map<String, Map<String, String>> queueDetails
-              = new HashMap<String, Map<String,String>>();
-    queueDetails.put("default", q1Props);
-    queueDetails.put("research", q2Props);
-    checkQueueProperties(testConf, queueDetails);
-  }
-  
-  public void testQueueWithDefaultProperties() {
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "minimum-user-limit-percent" }, 
-        new String[] { "20", 
-                        "75" }
-                      );
-    startConfig();
-    writeQueueDetails("default", q1Props);
-    endConfig();
-
-    testConf = new CapacitySchedulerConf(new Path(testConfFile));
-
-    Map<String, Map<String, String>> queueDetails
-              = new HashMap<String, Map<String,String>>();
-    Map<String, String> expProperties = new HashMap<String, String>();
-    for (String key : q1Props.keySet()) {
-      expProperties.put(key, q1Props.get(key));
-    }
-    expProperties.put("supports-priority", "false");
-    expProperties.put("maximum-initialized-jobs-per-user", "2");
-    queueDetails.put("default", expProperties);
-    checkQueueProperties(testConf, queueDetails);
-  }
-
-  public void testReload() throws IOException {
-    // use the setup in the test case testQueues as a base...
-    testQueues();
-    
-    // write new values to the file...
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "20.5", 
-                        "true", 
-                        "40" }
-                      );
-
-    Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "100", 
-                        "false",
-                        "50" }
-                      );
-
-    openFile();
-    startConfig();
-    writeDefaultConfiguration();
-    writeQueueDetails("default", q1Props);
-    writeQueueDetails("production", q2Props);
-    endConfig();
-    testConf.reloadConfiguration();
-    Map<String, Map<String, String>> queueDetails 
-                      = new HashMap<String, Map<String, String>>();
-    queueDetails.put("default", q1Props);
-    queueDetails.put("production", q2Props);
-    checkQueueProperties(testConf, queueDetails);
-  }
-
-  public void testQueueWithUserDefinedDefaultProperties() throws IOException {
-    openFile();
-    startConfig();
-    writeUserDefinedDefaultConfiguration();
-    endConfig();
-
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "capacity",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "-1", 
-                        "true", 
-                        "50" }
-                      );
-
-    Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "capacity",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "-1", 
-                        "true",
-                        "50" }
-                      );
-    
-    testConf = new CapacitySchedulerConf(new Path(testConfFile));
-
-    Map<String, Map<String, String>> queueDetails
-              = new HashMap<String, Map<String,String>>();
-    
-    queueDetails.put("default", q1Props);
-    queueDetails.put("production", q2Props);
-    
-    checkQueueProperties(testConf, queueDetails);
-  }
-  
-  public void testQueueWithDefaultPropertiesOverriden() throws IOException {
-    openFile();
-    startConfig();
-    writeUserDefinedDefaultConfiguration();
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "capacity",
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "-1", 
-                        "true", 
-                        "50" }
-                      );
-
-    Map<String, String> q2Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "40", 
-                        "true",
-                        "50" }
-                      );
-    Map<String, String> q3Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "40", 
-                        "true",
-                        "50" }
-                      );
-    writeQueueDetails("production", q2Props);
-    writeQueueDetails("test", q3Props);
-    endConfig();
-    testConf = new CapacitySchedulerConf(new Path(testConfFile));
-    Map<String, Map<String, String>> queueDetails
-              = new HashMap<String, Map<String,String>>();
-    queueDetails.put("default", q1Props);
-    queueDetails.put("production", q2Props);
-    queueDetails.put("test", q3Props);
-    checkQueueProperties(testConf, queueDetails);
-  }
-  
-  public void testInvalidUserLimit() throws IOException {
-    openFile();
-    startConfig();
-    Map<String, String> q1Props = setupQueueProperties(
-        new String[] { "capacity", 
-                       "supports-priority",
-                       "minimum-user-limit-percent" }, 
-        new String[] { "-1",
-                        "true", 
-                        "-50" }
-                      );
-    writeQueueDetails("default", q1Props);
-    endConfig();
-    try {
-      testConf = new CapacitySchedulerConf(new Path(testConfFile));
-      testConf.getMinimumUserLimitPercent("default");
-      fail("Expect Invalid user limit to raise Exception");
-    }catch(IllegalArgumentException e) {
-      assertTrue(true);
-    }
-  }
 
-  /**
-   * Create a hiearchy in the conf .and check if AbstractQueue hierarchy is
-   * created properly.
-   * 
-   * @throws Exception
-   */
-  public void testHierarchyQueueCreation() throws Exception {
-    testConf = new CapacitySchedulerConf();
-    openFile();
-    startConfig();
-    writeProperty("mapred.capacity-scheduler.queue.defaultt.subQueues","q1,q2");
-    writeProperty("mapred.capacity-scheduler.queue.defaultt.capacity","100");
-    writeProperty("mapred.capacity-scheduler.queue.defaultt.q1.capacity","50");
-    writeProperty("mapred.capacity-scheduler.queue.defaultt.q2.capacity","50");
-    writeProperty("mapred.capacity-scheduler.queue.defaultt.q2.maximum-capacity","80");
-    writeProperty("mapred.capacity-scheduler.queue.defaultt.q1.supports-priority","true");
-    endConfig();
-    testConf = new CapacitySchedulerConf(new Path(testConfFile));
-    CapacityTaskScheduler scheduler = new CapacityTaskScheduler();
-    scheduler.schedConf = testConf;
-    Set<String> qs = new HashSet<String>();
-    qs.add("defaultt");
-
-    QueueHierarchyBuilder builder 
-        = new QueueHierarchyBuilder(scheduler.schedConf);
-    builder.createHierarchy(scheduler.getRoot(),qs);
-    
-    AbstractQueue rt = scheduler.getRoot();
-
-    //check for 1st level children
-    assertEquals(rt.getChildren().size(),1);
-
-    //get the first level queue.
-    AbstractQueue q = rt.getChildren().get(0);
-    assertEquals(q.getName(),"defaultt");
-    assertEquals(q instanceof ContainerQueue,true);
-    assertTrue(q.getQueueSchedulingContext().getCapacityPercent() == 100.0f);
-    assertEquals(q.getQueueSchedulingContext().supportsPriorities(),false);
-    assertTrue(q.getQueueSchedulingContext().getUlMin() == 100);
-
-    //check for grandchildren
-    AbstractQueue q1 = q.getChildren().get(0);
-    assertEquals(q.getChildren().size(),2);
-    assertTrue(q1 instanceof JobQueue);
-    assertTrue(q1.getQueueSchedulingContext().getCapacityPercent() == 50);
-    assertTrue(q1.getQueueSchedulingContext().getMaxCapacityPercent() == 80);
 
-    
-  }
-  
   public void testInitializationPollerProperties() 
     throws Exception {
     /*
@@ -380,29 +112,6 @@
   }
   
 
-  private void checkQueueProperties(
-                        CapacitySchedulerConf testConf,
-                        Map<String, Map<String, String>> queueDetails) {
-    for (String queueName : queueDetails.keySet()) {
-      Map<String, String> map = queueDetails.get(queueName);
-      assertEquals(Float.parseFloat(map.get("capacity")),
-           testConf.getCapacity(queueName));
-      assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
-          testConf.getMinimumUserLimitPercent(queueName));
-      assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
-          testConf.isPrioritySupported(queueName));
-    }
-  }
-  
-  private Map<String, String> setupQueueProperties(String[] keys, 
-                                                String[] values) {
-    HashMap<String, String> map = new HashMap<String, String>();
-    for(int i=0; i<keys.length; i++) {
-      map.put(keys[i], values[i]);
-    }
-    return map;
-  }
-
   private void openFile() throws IOException {
     
     if (testDataDir != null) {
@@ -418,33 +127,6 @@
     writer.println("<?xml version=\"1.0\"?>");
     writer.println("<configuration>");
   }
-  
-  private void writeQueueDetails(String queue, Map<String, String> props) {
-    for (String key : props.keySet()) {
-      writer.println("<property>");
-      writer.println("<name>mapred.capacity-scheduler.queue." 
-                        + queue + "." + key +
-                    "</name>");
-      writer.println("<value>"+props.get(key)+"</value>");
-      writer.println("</property>");
-    }
-  }
-  
-  
-  private void writeDefaultConfiguration() {
-    writeProperty("mapred.capacity-scheduler.default-supports-priority"
-        , "false");
-    writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
-        , "100");
-  }
-
-
-  private void writeUserDefinedDefaultConfiguration() {
-    writeProperty("mapred.capacity-scheduler.default-supports-priority"
-        , "true");
-    writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
-        , "50");
-  }
 
 
   private void writeProperty(String name, String value) {
@@ -459,5 +141,38 @@
     writer.println("</configuration>");
     writer.close();
   }
+
+  public void testConfigurationValuesConversion() throws IOException {
+    Properties prp = new Properties();
+
+    prp.setProperty("capacity","10");
+    prp.setProperty("maximum-capacity","20.5");
+    prp.setProperty("supports-priority","false");
+    prp.setProperty("minimum-user-limit-percent","23");
+    prp.setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,"43");
+    prp.setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,"43");
+
+
+    CapacitySchedulerConf conf = new CapacitySchedulerConf();
+    conf.setProperties("default",prp);
+
+    assertTrue(conf.getCapacity("default") == 10f);
+    assertTrue(conf.getMaxCapacity("default") == 20.5f);
+    assertTrue(conf.isPrioritySupported("default") == false);
+    assertTrue(conf.getMinimumUserLimitPercent("default")==23);
+    assertTrue(conf.getMaxMapCap("default") == 43);
+    assertTrue(conf.getMaxReduceCap("default") == 43);
+
+
+
+    //check for inproper stuff
+    prp.setProperty("capacity","h");
+    prp.setProperty("maximum-capacity","20");
+
+    //This is because h is invalid value.
+    assertTrue(conf.getCapacity("default") == -1);
+    
+    assertFalse(conf.getMaxCapacity("default") != 20);
+  }
   
 }

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java Wed Sep 16 04:49:18 2009
@@ -24,26 +24,32 @@
 import org.apache.hadoop.mapreduce.SleepJob;
 
 public class TestCapacitySchedulerWithJobTracker extends
-    ClusterWithCapacityScheduler {
+                                                 ClusterWithCapacityScheduler {
 
   /**
-   * Test case which checks if the jobs which 
+   * Test case which checks if the jobs which
    * fail initialization are removed from the
    * {@link CapacityTaskScheduler} waiting queue.
-   * 
+   *
    * @throws Exception
    */
   public void testFailingJobInitalization() throws Exception {
     Properties schedulerProps = new Properties();
-    schedulerProps.put("mapred.capacity-scheduler.queue.default.capacity",
-        "100");
     Properties clusterProps = new Properties();
+    clusterProps.put("mapred.queue.names","default");
     clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(1));
-    clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
+    clusterProps.put(
+      "mapred.tasktracker.reduce.tasks.maximum", String
         .valueOf(1));
     clusterProps.put("mapred.jobtracker.maxtasks.per.job", String.valueOf(1));
     // cluster capacity 1 maps, 1 reduces
     startCluster(1, clusterProps, schedulerProps);
+    CapacityTaskScheduler scheduler = (CapacityTaskScheduler) getJobTracker()
+      .getTaskScheduler();
+
+     AbstractQueue root = scheduler.getRoot();
+     root.getChildren().get(0).getQueueSchedulingContext().setCapacityPercent(100);
+
     JobConf conf = getJobConf();
     conf.setSpeculativeExecution(false);
     conf.setNumTasksToExecutePerJvm(-1);
@@ -51,41 +57,54 @@
     sleepJob.setConf(conf);
     Job job = sleepJob.createJob(3, 3, 1, 1, 1, 1);
     job.waitForCompletion(false);
-    assertFalse("The submitted job successfully completed", 
-        job.isSuccessful());
-    CapacityTaskScheduler scheduler = (CapacityTaskScheduler) getJobTracker()
-        .getTaskScheduler();
+    assertFalse(
+      "The submitted job successfully completed",
+      job.isSuccessful());
+
     JobQueuesManager mgr = scheduler.jobQueuesManager;
-    assertEquals("Failed job present in Waiting queue", 0, mgr
+    assertEquals(
+      "Failed job present in Waiting queue", 0, mgr
         .getJobQueue("default").getWaitingJobCount());
   }
 
   /**
    * Test case which checks {@link JobTracker} and {@link CapacityTaskScheduler}
-   * 
+   * <p/>
    * Test case submits 2 jobs in two different capacity scheduler queues.
    * And checks if the jobs successfully complete.
-   * 
+   *
    * @throws Exception
    */
+
   public void testJobTrackerIntegration() throws Exception {
 
     Properties schedulerProps = new Properties();
-    String[] queues = new String[] { "Q1", "Q2" };
+    String[] queues = new String[]{"Q1", "Q2"};
     Job jobs[] = new Job[2];
-    for (String q : queues) {
-      schedulerProps.put(CapacitySchedulerConf
-          .toFullPropertyName(q, "capacity"), "50");
-      schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
-          "minimum-user-limit-percent"), "100");
-    }
 
     Properties clusterProps = new Properties();
     clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2));
-    clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
+    clusterProps.put(
+      "mapred.tasktracker.reduce.tasks.maximum", String
         .valueOf(2));
     clusterProps.put("mapred.queue.names", queues[0] + "," + queues[1]);
     startCluster(2, clusterProps, schedulerProps);
+    CapacityTaskScheduler scheduler = (CapacityTaskScheduler) getJobTracker()
+      .getTaskScheduler();
+
+
+
+    AbstractQueue root = scheduler.getRoot();
+
+    for(AbstractQueue q : root.getChildren()) {
+      q.getQueueSchedulingContext().setCapacityPercent(50);
+      q.getQueueSchedulingContext().setUlMin(100);
+    }
+
+
+    LOG.info("WE CREATED THE QUEUES TEST 2");
+   // scheduler.taskTrackerManager.getQueueManager().setQueues(qs);
+   // scheduler.start();
 
     JobConf conf = getJobConf();
     conf.setSpeculativeExecution(false);
@@ -105,9 +124,11 @@
     sleepJob2.setConf(conf2);
     jobs[1] = sleepJob2.createJob(3, 3, 5, 3, 5, 3);
     jobs[1].waitForCompletion(false);
-    assertTrue("Sleep job submitted to queue 1 is not successful", jobs[0]
+    assertTrue(
+      "Sleep job submitted to queue 1 is not successful", jobs[0]
         .isSuccessful());
-    assertTrue("Sleep job submitted to queue 2 is not successful", jobs[1]
+    assertTrue(
+      "Sleep job submitted to queue 2 is not successful", jobs[1]
         .isSuccessful());
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java Wed Sep 16 04:49:18 2009
@@ -18,15 +18,9 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.PrintWriter;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
@@ -77,7 +71,7 @@
     Allocations(Configuration conf, QueueManager queueManager) {
       this.conf = conf;
       this.queueManager = queueManager;
-      this.infoQueues = queueManager.getQueues();
+      this.infoQueues = queueManager.getLeafQueueNames();
       
       this.store = ReflectionUtils.newInstance(
           conf.getClass(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_STORE,
@@ -237,7 +231,7 @@
     long interval = conf.getLong(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_ALLOC_INTERVAL,20)*1000;
      
     timer.scheduleAtFixedRate(allocations, interval, interval);   
-    for (String queue: queueManager.getQueues()) {
+    for (String queue: queueManager.getLeafQueueNames()) {
       Object info = queueManager.getSchedulerInfo(queue);
       QueueInfo queueInfo = new QueueInfo(queue, info, allocations); 
       queueManager.setSchedulerInfo(queue, queueInfo);

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java Wed Sep 16 04:49:18 2009
@@ -138,7 +138,7 @@
     void setQueues(Set<String> queues) {
       this.queues = queues;
     }
-    public synchronized Set<String> getQueues() {
+    public synchronized Set<String> getLeafQueueNames() {
       return queues;
     }
   }

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=815605&r1=815604&r2=815605&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Wed Sep 16 04:49:18 2009
@@ -306,7 +306,7 @@
     void setQueues(Set<String> queues) {
       this.queues = queues;
     }
-    public synchronized Set<String> getQueues() {
+    public synchronized Set<String> getLeafQueueNames() {
       return queues;
     }
   }



Mime
View raw message