hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r808308 [2/5] - in /hadoop/mapreduce/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Date Thu, 27 Aug 2009 07:49:01 GMT
Added: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java?rev=808308&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java Thu Aug 27 07:49:00 2009
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Collections;
+
+/**
+ * Composite class for Queue hierarchy.
+ */
+public class ContainerQueue extends AbstractQueue {
+
+  //List of immediate children for this container queue.
+  //Duplicate childrens are not allowed.
+  private  List<AbstractQueue> children;
+  public ContainerQueue(AbstractQueue parent , QueueSchedulingContext qsc) {
+    super(parent,qsc);
+  }
+
+  /**
+   * Update current contexts and update children's contexts
+   * @param mapClusterCapacity
+   * @param reduceClusterCapacity
+   */
+  @Override
+  public void update(int mapClusterCapacity, int reduceClusterCapacity) {
+    super.update(mapClusterCapacity,reduceClusterCapacity);
+    updateChildrenContext();
+  }
+  
+  /**
+   * set normalized capacity values for children.
+   * and update children.
+   */
+  private void updateChildrenContext() {
+    for (AbstractQueue queue : children) {
+      int normalizedMapClusterCapacity = qsc.getMapTSC().getCapacity();
+      int normalizedReduceClusterCapacity = qsc.getReduceTSC().getCapacity();
+
+      //update children context,
+      // normalize mapClusterCapacity,reduceClusterCapacity to the current.
+      queue.update(
+        normalizedMapClusterCapacity, normalizedReduceClusterCapacity);
+
+      //update current TaskSchedulingContext information
+      //At parent level , these information is cumulative of all
+      //children's TSC values.
+      //Typically JobQueue's TSC would change first . so as of now
+      //parental level values would be stale unless we call update , which
+      //happens incase of new heartbeat.
+      //This behaviour shuold be fine , as before assignTask we first update
+      //then sort the whole hierarchy
+      qsc.getMapTSC().update(queue.getQueueSchedulingContext().getMapTSC());
+      qsc.getReduceTSC().update(queue.getQueueSchedulingContext().getReduceTSC());
+    }
+  }
+
+  
+  /**
+   * @param queueComparator
+   */
+  @Override
+  public void sort(Comparator queueComparator) {
+    //sort immediate children
+    Collections.sort(children, queueComparator);
+
+    //recursive sort all children.    
+    for (AbstractQueue child : children) {
+      child.sort(queueComparator);
+    }
+  }
+
+  /**
+   * Returns the sorted order of the leaf level queues.
+   * @return
+   */
+  @Override
+  public List<AbstractQueue> getDescendentJobQueues() {
+    List<AbstractQueue> l = new ArrayList<AbstractQueue>();
+
+    for (AbstractQueue child : children) {
+      l.addAll(child.getDescendentJobQueues());
+    }
+    return l;
+  }
+
+
+  /**
+   * Used for test only.
+   * @return
+   */
+  @Override
+  List<AbstractQueue> getChildren() {
+    return children;
+  }
+
+  @Override
+  public void addChild(AbstractQueue queue) {
+    if (children == null) {
+      children = new ArrayList<AbstractQueue>();
+    }
+    if(children.contains(queue)) {
+      LOG.warn(" The queue " + queue.getName() + " already " +
+        "exists hence ignoring  the current value ");
+      return;
+    }
+    this.children.add(queue);
+  }
+
+
+  /**
+   *
+   */
+  @Override
+  void distributeUnConfiguredCapacity() {
+    List<AbstractQueue> unConfiguredQueues = new ArrayList<AbstractQueue>();
+    float totalCapacity = 0;
+    for (AbstractQueue q : children) {
+      if (q.qsc.getCapacityPercent() == -1) {
+        //Add into unConfigured queue.
+        unConfiguredQueues.add(q);
+      } else {
+        //If capacity is set , then add that to totalCapacity.
+        LOG.info(" the capacity percent of the queue " + q.getName() + "  is " +
+          "" + q.qsc.getCapacityPercent());
+        totalCapacity += q.qsc.getCapacityPercent();
+
+        //As we already know current Capacity percent of this queue
+        //make children distribute unconfigured Capacity.
+        q.distributeUnConfiguredCapacity();
+      }
+    }
+
+    if (!unConfiguredQueues.isEmpty()) {
+      LOG.info("Total capacity to be distributed among the others are  " +
+        "" + (100 - totalCapacity));      
+
+      //We have list of queues at this level which are unconfigured.
+      //100 - totalCapacity is the capacity remaining.
+      //Divide it equally among all the un configured queues.      
+      float capacityShare = (100 - totalCapacity) / unConfiguredQueues.size();
+
+      //We dont have to check for 100 - totalCapacity being -ve , as
+      //we already do it while loading.
+      for (AbstractQueue q : unConfiguredQueues) {
+        q.qsc.setCapacityPercent(capacityShare);
+        LOG.info("Capacity share for un configured queue " + q.getName() + "" +
+          " is " + capacityShare);
+        //we have q's capacity now.
+        //make children also distribute it among themselves.
+        q.distributeUnConfiguredCapacity();
+      }
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Thu Aug 27 07:49:00 2009
@@ -134,7 +134,7 @@
           if (job == null) {
             continue;
           }
-          LOG.info("Initializing job : " + job.getJobID() + " in Queue "
+          LOG.info("Initializing job : " + job.getJobID() + " in AbstractQueue "
               + job.getProfile().getQueueName() + " For user : "
               + job.getProfile().getUser());
           if (startIniting) {
@@ -246,9 +246,9 @@
    */
   private HashMap<String, JobInitializationThread> threadsToQueueMap;
 
-  public JobInitializationPoller(JobQueuesManager mgr,
-      CapacitySchedulerConf rmConf, Set<String> queue, 
-      TaskTrackerManager ttm) {
+  public JobInitializationPoller(
+    JobQueuesManager mgr,
+    TaskTrackerManager ttm) {
     initializedJobs = new HashMap<JobID,JobInProgress>();
     jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
@@ -343,7 +343,7 @@
   private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
     for (JobInProgress job : jobsToInitialize) {
       LOG.info("Passing to Initializer Job Id :" + job.getJobID()
-          + " User: " + job.getProfile().getUser() + " Queue : "
+          + " User: " + job.getProfile().getUser() + " AbstractQueue : "
           + job.getProfile().getQueueName());
     }
   }
@@ -440,7 +440,7 @@
         * maxJobsPerUserAllowedToInitialize;
     int countOfJobsInitialized = 0;
     HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
-    Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
+    Collection<JobInProgress> jobs = jobQueueManager.getJobQueue(queue).getWaitingJobs();
     /*
      * Walk through the collection of waiting jobs.
      *  We maintain a map of jobs that have already been initialized. If a 
@@ -536,7 +536,7 @@
           LOG.info("Removing scheduled jobs from waiting queue"
               + job.getJobID());
           jobsIterator.remove();
-          jobQueueManager.removeJobFromWaitingQueue(job);
+          jobQueueManager.getJobQueue(job).removeWaitingJob(new JobSchedulingInfo(job));
           continue;
         }
       }

Added: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java?rev=808308&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueue.java Thu Aug 27 07:49:00 2009
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ *
+ */
+public class JobQueue extends AbstractQueue {
+
+  static final Log LOG = LogFactory.getLog(JobQueue.class);
+
+  public JobQueue(AbstractQueue parent, QueueSchedulingContext qsc) {
+    super(parent, qsc);
+    if (qsc.supportsPriorities()) {
+      // use the default priority-aware comparator
+      comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
+    } else {
+      comparator = STARTTIME_JOB_COMPARATOR;
+    }
+    waitingJobs =
+      new
+        TreeMap<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>(
+        comparator);
+    runningJobs =               
+      new
+        TreeMap<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>(
+        comparator);
+  }
+
+
+  /*
+  * If a queue supports priorities, jobs must be
+  * sorted on priorities, and then on their start times (technically,
+  * their insertion time.
+  * If a queue doesn't support priorities, jobs are
+  * sorted based on their start time.
+  */
+  static final Comparator<JobQueueJobInProgressListener.JobSchedulingInfo>
+    STARTTIME_JOB_COMPARATOR;
+
+  static {
+    STARTTIME_JOB_COMPARATOR =
+      new Comparator<JobQueueJobInProgressListener.JobSchedulingInfo>() {
+        // comparator for jobs in queues that don't support priorities
+        public int compare(
+          JobQueueJobInProgressListener.JobSchedulingInfo o1,
+          JobQueueJobInProgressListener.JobSchedulingInfo o2) {
+          // the job that started earlier wins
+          if (o1.getStartTime() < o2.getStartTime()) {
+            return -1;
+          } else {
+            return (o1.getStartTime() == o2.getStartTime()
+              ? o1.getJobID().compareTo(o2.getJobID())
+              : 1);
+          }
+        }
+      };
+  }
+
+
+  /**
+   * This involves updating each qC structure.
+   *
+   * @param mapClusterCapacity
+   * @param reduceClusterCapacity
+   */
+  @Override
+  public void update(int mapClusterCapacity, int reduceClusterCapacity) {
+    super.update(mapClusterCapacity, reduceClusterCapacity);
+    for (JobInProgress j :
+      this.getRunningJobs()) {
+      updateStatsOnRunningJob(qsc, j);
+    }
+  }
+
+  private void updateStatsOnRunningJob(
+    QueueSchedulingContext qC, JobInProgress j) {
+    if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+      return;
+    }
+
+    TaskSchedulingContext mapTSI = qC.getMapTSC();
+    TaskSchedulingContext reduceTSI = qC.getReduceTSC();
+
+    int numMapsRunningForThisJob = j.runningMaps();
+    int numReducesRunningForThisJob = j.runningReduces();
+    TaskDataView mapScheduler = TaskDataView.getTaskDataView(TaskType.MAP);
+    TaskDataView reduceScheduler = 
+        TaskDataView.getTaskDataView(TaskType.REDUCE);
+    int numRunningMapSlots =
+      numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
+    int numRunningReduceSlots =
+      numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
+    int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
+    int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
+    int numReservedMapSlotsForThisJob =
+      (mapScheduler.getNumReservedTaskTrackers(j) *
+        mapScheduler.getSlotsPerTask(j));
+    int numReservedReduceSlotsForThisJob =
+      (reduceScheduler.getNumReservedTaskTrackers(j) *
+        reduceScheduler.getSlotsPerTask(j));
+
+    j.setSchedulingInfo(
+      String.format(
+        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+        numMapsRunningForThisJob,
+        numRunningMapSlots,
+        numReservedMapSlotsForThisJob,
+        numReducesRunningForThisJob,
+        numRunningReduceSlots,
+        numReservedReduceSlotsForThisJob));
+
+
+    mapTSI.setNumRunningTasks(
+      mapTSI.getNumRunningTasks() + numMapsRunningForThisJob);
+    reduceTSI.setNumRunningTasks(
+      reduceTSI.getNumRunningTasks() + numReducesRunningForThisJob);
+    mapTSI.setNumSlotsOccupied(
+      mapTSI.getNumSlotsOccupied() + numMapSlotsForThisJob);
+    reduceTSI.setNumSlotsOccupied(
+      reduceTSI.getNumSlotsOccupied() + numReduceSlotsForThisJob);
+    Integer i =
+      mapTSI.getNumSlotsOccupiedByUser().get(
+        j.getProfile().getUser());
+    mapTSI.getNumSlotsOccupiedByUser().put(
+      j.getProfile().getUser(),
+      i.intValue() + numMapSlotsForThisJob);
+    i = reduceTSI.getNumSlotsOccupiedByUser().get(
+      j.getProfile().getUser());
+    reduceTSI.getNumSlotsOccupiedByUser().put(
+      j.getProfile().getUser(),
+      i.intValue() + numReduceSlotsForThisJob);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+        String.format(
+          "updateQSI: job %s: run(m)=%d, "
+            + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
+            + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
+            + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
+            .getJobID().toString(), numMapsRunningForThisJob,
+          numMapSlotsForThisJob, numReducesRunningForThisJob,
+          numReduceSlotsForThisJob, j
+            .finishedMaps(), j.finishedReduces(), j.failedMapTasks,
+          j.failedReduceTasks, j.speculativeMapTasks, j.speculativeReduceTasks,
+          j.numMapTasks, j.numReduceTasks));
+    }
+
+    /*
+    * it's fine walking down the entire list of running jobs - there
+  * probably will not be many, plus, we may need to go through the
+  * list to compute numSlotsOccupiedByUser. If this is expensive, we
+  * can keep a list of running jobs per user. Then we only need to
+  * consider the first few jobs per user.
+  */
+  }
+
+
+  Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>
+    waitingJobs; // for waiting jobs
+  Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>
+    runningJobs; // for running jobs
+
+  public Comparator<JobQueueJobInProgressListener.JobSchedulingInfo>
+    comparator;
+
+  Collection<JobInProgress> getWaitingJobs() {
+    synchronized (waitingJobs) {
+      return Collections.unmodifiableCollection(
+        new LinkedList<JobInProgress>(waitingJobs.values()));
+    }
+  }
+
+  Collection<JobInProgress> getRunningJobs() {
+    synchronized (runningJobs) {
+      return Collections.unmodifiableCollection(
+        new LinkedList<JobInProgress>(runningJobs.values()));
+    }
+  }
+
+  private void addRunningJob(JobInProgress job) {
+    synchronized (runningJobs) {
+      runningJobs.put(
+        new JobQueueJobInProgressListener.JobSchedulingInfo(
+          job), job);
+    }
+  }
+
+  private JobInProgress removeRunningJob(
+    JobQueueJobInProgressListener.JobSchedulingInfo jobInfo) {
+    synchronized (runningJobs) {
+      return runningJobs.remove(jobInfo);
+    }
+  }
+
+  JobInProgress removeWaitingJob(
+    JobQueueJobInProgressListener.JobSchedulingInfo schedInfo) {
+    synchronized (waitingJobs) {
+      JobInProgress jip = waitingJobs.remove(schedInfo);
+      this.qsc.setNumOfWaitingJobs(waitingJobs.size());
+      return jip;
+    }
+  }
+
+  private void addWaitingJob(JobInProgress job) {
+    synchronized (waitingJobs) {
+      waitingJobs.put(
+        new JobQueueJobInProgressListener.JobSchedulingInfo(
+          job), job);
+      this.qsc.setNumOfWaitingJobs(waitingJobs.size());
+    }
+  }
+
+  int getWaitingJobCount() {
+    synchronized (waitingJobs) {
+      return waitingJobs.size();
+    }
+  }
+
+  // called when a job is added
+  synchronized void jobAdded(JobInProgress job) throws IOException {
+    // add job to waiting queue. It will end up in the right place,
+    // based on priority. 
+    addWaitingJob(job);
+    // update user-specific info
+    Integer i = qsc.getNumJobsByUser().get(job.getProfile().getUser());
+    if (null == i) {
+      i = 1;
+      // set the count for running tasks to 0
+      qsc.getMapTSC().getNumSlotsOccupiedByUser().put(
+        job.getProfile().getUser(),
+        0);
+      qsc.getReduceTSC().getNumSlotsOccupiedByUser().
+        put(
+          job.getProfile().getUser(),
+          0);
+    } else {
+      i++;
+    }
+    qsc.getNumJobsByUser().put(job.getProfile().getUser(), i);
+
+    // setup scheduler specific job information
+    preInitializeJob(job);
+
+    LOG.debug(
+      "Job " + job.getJobID().toString() + " is added under user "
+        + job.getProfile().getUser() + ", user now has " + i + " jobs");
+  }
+
+
+  /**
+   * Setup {@link CapacityTaskScheduler} specific information prior to
+   * job initialization.
+   * <p/>
+   * TO DO: Currently this method uses , CapacityTaskScheduler based variables
+   * need to shift those.
+   */
+  void preInitializeJob(JobInProgress job) {
+    JobConf jobConf = job.getJobConf();
+
+    // Compute number of slots required to run a single map/reduce task
+    int slotsPerMap = 1;
+    int slotsPerReduce = 1;
+    if (MemoryMatcher.isSchedulingBasedOnMemEnabled()) {
+      slotsPerMap = jobConf.computeNumSlotsPerMap(
+        MemoryMatcher.getMemSizeForMapSlot());
+      slotsPerReduce =
+        jobConf.computeNumSlotsPerReduce(
+          MemoryMatcher.getMemSizeForReduceSlot());
+    }
+    job.setNumSlotsPerMap(slotsPerMap);
+    job.setNumSlotsPerReduce(slotsPerReduce);
+  }
+
+  // called when a job completes
+  synchronized void jobCompleted(JobInProgress job) {
+
+    LOG.debug("Job to be removed for user " + job.getProfile().getUser());
+    Integer i = qsc.getNumJobsByUser().get(job.getProfile().getUser());
+    i--;
+    if (0 == i.intValue()) {
+      qsc.getNumJobsByUser().remove(job.getProfile().getUser());
+      // remove job footprint from our TSIs
+      qsc.getMapTSC().getNumSlotsOccupiedByUser().remove(
+        job.getProfile().getUser());
+      qsc.getReduceTSC().getNumSlotsOccupiedByUser().remove(
+        job.getProfile().getUser());
+      LOG.debug(
+        "No more jobs for user, number of users = " + qsc
+          .getNumJobsByUser().size());
+    } else {
+      qsc.getNumJobsByUser().put(job.getProfile().getUser(), i);
+      LOG.debug(
+        "User still has " + i + " jobs, number of users = "
+          + qsc.getNumJobsByUser().size());
+    }
+  }
+
+  // This is used to reposition a job in the queue. A job can get repositioned
+  // because of the change in the job priority or job start-time.
+  private void reorderJobs(
+    JobInProgress job, JobQueueJobInProgressListener.JobSchedulingInfo oldInfo
+  ) {
+
+    if (removeWaitingJob(oldInfo) != null) {
+      addWaitingJob(job);
+    }
+    if (removeRunningJob(oldInfo) != null) {
+      addRunningJob(job);
+    }
+  }
+
+  /**
+   * @return
+   */
+  @Override
+  public List<AbstractQueue> getDescendentJobQueues() {
+    List<AbstractQueue> l = new ArrayList<AbstractQueue>();
+    l.add(this);
+    return l;
+  }
+
+  public void jobUpdated(JobChangeEvent event) {
+    JobInProgress job = event.getJobInProgress();
+
+    // Check if this is the status change
+    if (event instanceof JobStatusChangeEvent) {
+      jobStateChanged((JobStatusChangeEvent) event);
+    }
+  }
+
+  /**
+   * @return
+   */
+  @Override
+  List<AbstractQueue> getChildren() {
+    return null;
+  }
+
+  /**
+   * Dont do anything in sort , this is leaf level queue.
+   *
+   * @param queueComparator
+   */
+  @Override
+  public void sort(Comparator queueComparator) {
+    return;
+  }
+
+  // Update the scheduler as job's state has changed
+  private void jobStateChanged(JobStatusChangeEvent event) {
+    JobInProgress job = event.getJobInProgress();
+    JobQueueJobInProgressListener.JobSchedulingInfo oldJobStateInfo =
+      new JobQueueJobInProgressListener.JobSchedulingInfo(event.getOldStatus());
+    // Check if the ordering of the job has changed
+    // For now priority and start-time can change the job ordering
+    if (event.getEventType() == JobStatusChangeEvent.EventType.PRIORITY_CHANGED
+      || event.getEventType() ==
+      JobStatusChangeEvent.EventType.START_TIME_CHANGED) {
+      // Make a priority change
+      reorderJobs(job, oldJobStateInfo);
+    } else if (event.getEventType() ==
+      JobStatusChangeEvent.EventType.RUN_STATE_CHANGED) {
+      // Check if the job is complete
+      int runState = job.getStatus().getRunState();
+      if (runState == JobStatus.SUCCEEDED
+        || runState == JobStatus.FAILED
+        || runState == JobStatus.KILLED) {
+        jobCompleted(job, oldJobStateInfo);
+      } else if (runState == JobStatus.RUNNING) {
+        // Removing of the job from job list is responsibility of the
+        //initialization poller.
+        // Add the job to the running queue
+        addRunningJob(job);
+      }
+    }
+  }
+
+  /*
+  * Method removes the jobs from both running and waiting job queue in
+  * job queue manager.
+  */
+  private void jobCompleted(
+    JobInProgress job, JobQueueJobInProgressListener.JobSchedulingInfo oldInfo
+  ) {
+    LOG.info(
+      "Job " + job.getJobID().toString() + " submitted to queue "
+        + job.getProfile().getQueueName() + " has completed");
+    //remove jobs from both queue's a job can be in
+    //running and waiting queue at the same time.
+    removeRunningJob(oldInfo);
+    removeWaitingJob(oldInfo);
+    // let scheduler know
+    jobCompleted(job);
+  }
+
+  @Override
+  public void addChild(AbstractQueue queue) {
+    throw new UnsupportedOperationException(
+      "addChildren is not allowed for " +
+        "" + getName());
+  }
+
+  @Override
+  void distributeUnConfiguredCapacity() {
+    return;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Aug 27 07:49:00 2009
@@ -18,270 +18,81 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
-import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 /**
  * A {@link JobInProgressListener} that maintains the jobs being managed in
- * one or more queues. 
+ * one or more queues.
  */
 class JobQueuesManager extends JobInProgressListener {
 
-  /* 
-   * If a queue supports priorities, jobs must be 
-   * sorted on priorities, and then on their start times (technically, 
-   * their insertion time.  
-   * If a queue doesn't support priorities, jobs are
-   * sorted based on their start time.  
-   */
-  
-  // comparator for jobs in queues that don't support priorities
-  private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
-    = new Comparator<JobSchedulingInfo>() {
-    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
-      // the job that started earlier wins
-      if (o1.getStartTime() < o2.getStartTime()) {
-        return -1;
-      } else {
-        return (o1.getStartTime() == o2.getStartTime() 
-                ? o1.getJobID().compareTo(o2.getJobID()) 
-                : 1);
-      }
-    }
-  };
-  
-  // class to store queue info
-  private static class QueueInfo {
-
-    // whether the queue supports priorities
-    boolean supportsPriorities;
-    Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
-    Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
-    
-    public Comparator<JobSchedulingInfo> comparator;
-    
-    QueueInfo(boolean prio) {
-      this.supportsPriorities = prio;
-      if (supportsPriorities) {
-        // use the default priority-aware comparator
-        comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
-      }
-      else {
-        comparator = STARTTIME_JOB_COMPARATOR;
-      }
-      waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-      runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-    }
-    
-    Collection<JobInProgress> getWaitingJobs() {
-      synchronized (waitingJobs) {
-        return Collections.unmodifiableCollection(
-            new LinkedList<JobInProgress>(waitingJobs.values()));
-      }
-    }
-    
-    Collection<JobInProgress> getRunningJobs() {
-      synchronized (runningJobs) {
-       return Collections.unmodifiableCollection(
-           new LinkedList<JobInProgress>(runningJobs.values())); 
-      }
-    }
-    
-    void addRunningJob(JobInProgress job) {
-      synchronized (runningJobs) {
-       runningJobs.put(new JobSchedulingInfo(job),job); 
-      }
-    }
-    
-    JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
-      synchronized (runningJobs) {
-        return runningJobs.remove(jobInfo); 
-      }
-    }
-    
-    JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
-      synchronized (waitingJobs) {
-        return waitingJobs.remove(schedInfo);
-      }
-    }
-    
-    void addWaitingJob(JobInProgress job) {
-      synchronized (waitingJobs) {
-        waitingJobs.put(new JobSchedulingInfo(job), job);
-      }
-    }
-    
-    int getWaitingJobCount() {
-      synchronized (waitingJobs) {
-       return waitingJobs.size(); 
-      }
-    }
-    
-  }
-  
   // we maintain a hashmap of queue-names to queue info
-  private Map<String, QueueInfo> jobQueues = 
-    new HashMap<String, QueueInfo>();
+  private Map<String, JobQueue> jobQueues =
+    new HashMap<String, JobQueue>();
   private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
-  private CapacityTaskScheduler scheduler;
 
-  
-  JobQueuesManager(CapacityTaskScheduler s) {
-    this.scheduler = s;
+
+  JobQueuesManager() {
   }
-  
+
   /**
    * create an empty queue with the default comparator
-   * @param queueName The name of the queue
-   * @param supportsPriotities whether the queue supports priorities
-   */
-  public void createQueue(String queueName, boolean supportsPriotities) {
-    jobQueues.put(queueName, new QueueInfo(supportsPriotities));
-  }
-  
-  /**
-   * Returns the queue of running jobs associated with the name
+   *
+   * @param queue The jobqueue
    */
-  public Collection<JobInProgress> getRunningJobQueue(String queueName) {
-    return jobQueues.get(queueName).getRunningJobs();
+  public void addQueue(JobQueue queue) {
+    jobQueues.put(queue.getName(),queue);
   }
-  
-  /**
-   * Returns the queue of waiting jobs associated with queue name.
-   * 
-   */
-  Collection<JobInProgress> getWaitingJobs(String queueName) {
-    return jobQueues.get(queueName).getWaitingJobs();
-  }
-  
+
   @Override
   public void jobAdded(JobInProgress job) throws IOException {
     LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
     // add job to the right queue
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    JobQueue qi = getJobQueue(job.getProfile().getQueueName());
     if (null == qi) {
       // job was submitted to a queue we're not aware of
-      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
-          " specified for job" + job.getProfile().getJobID() + 
+      LOG.warn(
+        "Invalid queue " + job.getProfile().getQueueName() +
+          " specified for job" + job.getProfile().getJobID() +
           ". Ignoring job.");
       return;
     }
-    // add job to waiting queue. It will end up in the right place, 
-    // based on priority. 
-    qi.addWaitingJob(job);
     // let scheduler know. 
-    scheduler.jobAdded(job);
+    qi.jobAdded(job);
   }
 
-  /*
-   * Method removes the jobs from both running and waiting job queue in 
-   * job queue manager.
-   */
-  private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
-                            QueueInfo qi) {
-    LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
-        + job.getProfile().getQueueName() + " has completed");
-    //remove jobs from both queue's a job can be in
-    //running and waiting queue at the same time.
-    qi.removeRunningJob(oldInfo);
-    qi.removeWaitingJob(oldInfo);
-    // let scheduler know
-    scheduler.jobCompleted(job);
-  }
-  
   // Note that job is removed when the job completes i.e in jobUpated()
   @Override
-  public void jobRemoved(JobInProgress job) {}
-  
-  // This is used to reposition a job in the queue. A job can get repositioned 
-  // because of the change in the job priority or job start-time.
-  private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
-                           QueueInfo qi) {
-    
-    if(qi.removeWaitingJob(oldInfo) != null) {
-      qi.addWaitingJob(job);
-    }
-    if(qi.removeRunningJob(oldInfo) != null) {
-      qi.addRunningJob(job);
-    }
-  }
-  
-  // This is used to move a job from the waiting queue to the running queue.
-  private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, 
-                              QueueInfo qi) {
-    // Removing of the job from job list is responsibility of the
-    //initialization poller.
-    // Add the job to the running queue
-    qi.addRunningJob(job);
-  }
-  
-  // Update the scheduler as job's state has changed
-  private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
-    JobInProgress job = event.getJobInProgress();
-    JobSchedulingInfo oldJobStateInfo = 
-      new JobSchedulingInfo(event.getOldStatus());
-    // Check if the ordering of the job has changed
-    // For now priority and start-time can change the job ordering
-    if (event.getEventType() == EventType.PRIORITY_CHANGED 
-        || event.getEventType() == EventType.START_TIME_CHANGED) {
-      // Make a priority change
-      reorderJobs(job, oldJobStateInfo, qi);
-    } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
-      // Check if the job is complete
-      int runState = job.getStatus().getRunState();
-      if (runState == JobStatus.SUCCEEDED
-          || runState == JobStatus.FAILED
-          || runState == JobStatus.KILLED) {
-        jobCompleted(job, oldJobStateInfo, qi);
-      } else if (runState == JobStatus.RUNNING) {
-        makeJobRunning(job, oldJobStateInfo, qi);
-      }
-    }
+  public void jobRemoved(JobInProgress job) {
   }
-  
+
+
   @Override
   public void jobUpdated(JobChangeEvent event) {
     JobInProgress job = event.getJobInProgress();
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
-    if (null == qi) {
-      // can't find queue for job. Shouldn't happen. 
-      LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
-          " when updating job " + job.getProfile().getJobID());
-      return;
-    }
-    
-    // Check if this is the status change
-    if (event instanceof JobStatusChangeEvent) {
-      jobStateChanged((JobStatusChangeEvent)event, qi);
-    }
-  }
-  
-  void removeJobFromWaitingQueue(JobInProgress job) {
-    String queue = job.getProfile().getQueueName();
-    QueueInfo qi = jobQueues.get(queue);
-    qi.removeWaitingJob(new JobSchedulingInfo(job));
+    JobQueue qi = getJobQueue(job.getProfile().getQueueName());
+    qi.jobUpdated(event);
+
   }
-  
+
   Comparator<JobSchedulingInfo> getComparator(String queue) {
-    return jobQueues.get(queue).comparator;
+    return getJobQueue(queue).comparator;
   }
-  
-  int getWaitingJobCount(String queue) {
-    QueueInfo qi = jobQueues.get(queue);
-    return qi.getWaitingJobCount();
+
+
+  public JobQueue getJobQueue(JobInProgress jip){
+    return getJobQueue(jip.getProfile().getQueueName());   
+  }
+
+  JobQueue getJobQueue(String name) {
+    return jobQueues.get(name);
   }
 
-  boolean doesQueueSupportPriorities(String queueName) {
-    return jobQueues.get(queueName).supportsPriorities;
+  public Set<String> getJobQueueNames() {
+    return jobQueues.keySet();
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Thu Aug 27 07:49:00 2009
@@ -21,28 +21,18 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.conf.Configuration;
 
 class MemoryMatcher {
 
   private static final Log LOG = LogFactory.getLog(MemoryMatcher.class);
-  private CapacityTaskScheduler scheduler;
+  static long memSizeForMapSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT;
+  static long memSizeForReduceSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT;
+  static long limitMaxMemForMapTasks = JobConf.DISABLED_MEMORY_LIMIT;
+  static long limitMaxMemForReduceTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
-  public MemoryMatcher(CapacityTaskScheduler capacityTaskScheduler) {
-    this.scheduler = capacityTaskScheduler;
-  }
 
-  boolean isSchedulingBasedOnMemEnabled() {
-    if (scheduler.getLimitMaxMemForMapSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.getLimitMaxMemForReduceSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.getMemSizeForMapSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.getMemSizeForReduceSlot()
-                                  == JobConf.DISABLED_MEMORY_LIMIT) {
-      return false;
-    }
-    return true;
+  public MemoryMatcher() {
   }
 
   /**
@@ -68,12 +58,12 @@
         // Get the memory "allotted" for this task based on number of slots
         long myVmem = 0;
         if (task.getIsMap() && taskType == TaskType.MAP) {
-          long memSizePerMapSlot = scheduler.getMemSizeForMapSlot(); 
+          long memSizePerMapSlot = getMemSizeForMapSlot();
           myVmem = 
             memSizePerMapSlot * task.getNumSlots();
         } else if (!task.getIsMap()
             && taskType == TaskType.REDUCE) {
-          long memSizePerReduceSlot = scheduler.getMemSizeForReduceSlot(); 
+          long memSizePerReduceSlot = getMemSizeForReduceSlot();
           myVmem = memSizePerReduceSlot * task.getNumSlots();
         }
         vmem += myVmem;
@@ -108,11 +98,11 @@
     if (taskType == TaskType.MAP) {
       memForThisTask = job.getJobConf().getMemoryForMapTask();
       totalMemUsableOnTT =
-          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
+          getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
     } else if (taskType == TaskType.REDUCE) {
       memForThisTask = job.getJobConf().getMemoryForReduceTask();
       totalMemUsableOnTT =
-          scheduler.getMemSizeForReduceSlot()
+          getMemSizeForReduceSlot()
               * taskTracker.getMaxReduceSlots();
     }
 
@@ -135,4 +125,108 @@
     }
     return true;
   }
+
+  static boolean isSchedulingBasedOnMemEnabled() {
+    if (getLimitMaxMemForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || getLimitMaxMemForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || getMemSizeForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || getMemSizeForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  public static void initializeMemoryRelatedConf(Configuration conf) {
+    //handling @deprecated
+    if (conf.get(
+      CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY) !=
+      null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY));
+    }
+
+    //handling @deprecated
+    if (conf.get(CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY) !=
+      null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY));
+    }
+
+    if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
+    }
+
+    memSizeForMapSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    memSizeForReduceSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    //handling @deprecated values
+    if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
+          " instead use " +JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
+          " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
+      );
+
+      limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
+            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+      if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT &&
+        limitMaxMemForMapTasks >= 0) {
+        limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
+          limitMaxMemForMapTasks /
+            (1024 * 1024); //Converting old values in bytes to MB
+      }
+    } else {
+      limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+      limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    }
+    LOG.info(String.format("Scheduler configured with "
+        + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
+        + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
+        + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
+        .valueOf(memSizeForReduceSlotOnJT), Long
+        .valueOf(limitMaxMemForMapTasks), Long
+        .valueOf(limitMaxMemForReduceTasks)));
+  }
+
+  static long  getMemSizeForMapSlot() {
+    return memSizeForMapSlotOnJT;
+  }
+
+  static long getMemSizeForReduceSlot() {
+    return memSizeForReduceSlotOnJT;
+  }
+
+  static long getLimitMaxMemForMapSlot() {
+    return limitMaxMemForMapTasks;
+  }
+
+  static long getLimitMaxMemForReduceSlot() {
+    return limitMaxMemForReduceTasks;
+  }
 }

Added: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java?rev=808308&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java Thu Aug 27 07:49:00 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Set;
+
+/**
+ * Hierarchy builder for the CapacityScheduler.
+ * 
+ */
+public class QueueHierarchyBuilder {
+
+  static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+  private final String NAME_SEPERATOR = ".";
+  private CapacitySchedulerConf schedConf;
+  
+  QueueHierarchyBuilder(CapacitySchedulerConf schedConf) {
+    this.schedConf = schedConf;
+  }
+  
+  /**
+   * The first call would expect that parent has children.
+   * @param parent       parent Queue
+   * @param children     children
+   */
+  void createHierarchy(
+    AbstractQueue parent, Set<String> children) {
+    //check if children have further childrens.
+    if (children != null && !children.isEmpty()) {
+      float totalCapacity = 0.0f;
+      for (String qName : children) {
+        if(qName.contains(NAME_SEPERATOR)) {
+          throw new IllegalArgumentException( NAME_SEPERATOR  +  "" +
+            " not allowed in queue name \'" + qName + "\'.");
+        }
+        //generate fully qualified name.
+        if (!parent.getName().equals("")) {
+          qName = parent.getName() + NAME_SEPERATOR + qName;
+        }
+        //Check if this child has any more children.
+        Set<String> childQueues = schedConf.getSubQueues(qName);
+
+        if (childQueues != null && childQueues.size() > 0) {
+          //generate a new ContainerQueue and recursively
+          //create hierarchy.
+          AbstractQueue cq = new ContainerQueue(
+            parent,
+            loadContext(
+              qName));
+          //update totalCapacity
+          totalCapacity += cq.qsc.getCapacityPercent();
+          LOG.info("Created a ContainerQueue " + qName);
+          //create child hiearchy
+          createHierarchy(cq, childQueues);
+        } else {
+          //if not this is a JobQueue.
+
+          //create a JobQueue.
+          AbstractQueue jq = new JobQueue(
+            parent,
+            loadContext(
+              qName));
+          totalCapacity += jq.qsc.getCapacityPercent();
+          LOG.info("Created a jobQueue " + qName);
+        }
+      }
+
+      //check for totalCapacity at each level , the total for children
+      //shouldn't cross 100.
+
+      if (totalCapacity > 100.0) {
+        throw new IllegalArgumentException(
+          "For queue " + parent.getName() +
+            " Sum of child queue capacities over 100% at "
+            + totalCapacity);
+      }
+    }
+  }
+
+
+  private QueueSchedulingContext loadContext(
+    String queueName) {
+    float capacity = schedConf.getCapacity(queueName);
+    float stretchCapacity = schedConf.getMaxCapacity(queueName);
+    if (capacity == -1.0) {
+      LOG.info("No capacity specified for queue " + queueName);
+    }
+    int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
+    // create our QSC and add to our hashmap
+    QueueSchedulingContext qsi = new QueueSchedulingContext(
+      queueName, capacity, stretchCapacity, ulMin,
+      schedConf.getMaxMapCap(
+        queueName), schedConf.getMaxReduceCap(queueName));
+    qsi.setSupportsPriorities(
+      schedConf.isPrioritySupported(
+        queueName));
+    return qsi;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java?rev=808308&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java Thu Aug 27 07:49:00 2009
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * ********************************************************************
+ * Keeping track of scheduling information for queues
+ * <p/>
+ * We need to maintain scheduling information relevant to a queue (its
+ * name, capacity, etc), along with information specific to
+ * each kind of task, Map or Reduce (num of running tasks, pending
+ * tasks etc).
+ * <p/>
+ * This scheduling information is used to decide how to allocate
+ * tasks, redistribute capacity, etc.
+ * <p/>
+ * A QueueSchedulingContext(qsc) object represents scheduling information for
+ * a queue. 
+ * ********************************************************************
+ */
+public class QueueSchedulingContext {
+
+    //Name of this queue
+    private String queueName;
+
+    //Get the maximum capacity of this queue for running map tasks
+    // in the cluster.
+    private int mapCapacity;
+
+    //Get the maximum capacity of this queue for running reduce tasks 
+    // in the cluster.
+    private int reduceCapacity;
+
+    /**
+     * capacity(%) is set in the config as
+     * mapred.capacity-scheduler.queue.<queue-name>.capacity"
+     * Percentage of the number of slots in the cluster that are
+     * to be available for jobs in this queue.
+     */
+    private float capacityPercent = 0;
+
+  /**
+   * maxCapacityStretch(%) is set in config as
+   * mapred.capacity-scheduler.queue.<queue-name>.maximum-capacity
+   * maximum-capacity-stretch defines a limit beyond which a sub-queue
+   * cannot use the capacity of its parent queue.
+   */
+    private float maxCapacityPercent = -1;
+
+    /**
+     * to handle user limits, we need to know how many users have jobs in
+     * the queue.
+     */
+    private Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
+
+    /**
+     * min value of user limit (same for all users)
+     */
+    private int ulMin;
+  
+    // whether the queue supports priorities
+    //default is false
+    private boolean supportsPriorities = false;
+
+    //No of waiting jobs.
+    private int numOfWaitingJobs = 0;
+
+    //State of mapCapacity
+    private int prevMapCapacity = 0;
+
+    //State of reduceCapacity
+    private int prevReduceCapacity = 0;
+
+
+    /**
+     * We keep a TaskSchedulingInfo object for each kind of task we support
+     */
+    private TaskSchedulingContext mapTSC;
+    private TaskSchedulingContext reduceTSC;
+
+  QueueSchedulingContext(
+    String queueName, float capacityPercent, float maxCapacityPercent,
+    int ulMin,
+    int mapCap, int reduceCap) {
+      this.setQueueName(new String(queueName));
+      this.setCapacityPercent(capacityPercent);
+      this.setMaxCapacityPercent(maxCapacityPercent);
+      this.setUlMin(ulMin);
+      this.setMapTSC(new TaskSchedulingContext(TaskType.MAP));
+      this.setReduceTSC(new TaskSchedulingContext(TaskType.REDUCE));
+      this.getMapTSC().setMaxTaskLimit(mapCap);
+      this.getReduceTSC().setMaxTaskLimit(reduceCap);
+    }
+
+  /**
+     * return information about the queue
+     *
+     * @return a String representing the information about the queue.
+     */
+    @Override
+    public String toString() {
+      // We print out the queue information first, followed by info
+      // on map and reduce tasks and job info
+      StringBuffer sb = new StringBuffer();
+      sb.append("Queue configuration\n");
+      sb.append("Capacity Percentage: ");
+      sb.append(getCapacityPercent());
+      sb.append("%\n");
+      sb.append(String.format("User Limit: %d%s\n", getUlMin(), "%"));
+      sb.append(
+        String.format(
+          "Priority Supported: %s\n",
+          (supportsPriorities()) ?
+            "YES" : "NO"));
+      sb.append("-------------\n");
+
+      sb.append("Map tasks\n");
+      sb.append(getMapTSC().toString());
+      sb.append("-------------\n");
+      sb.append("Reduce tasks\n");
+      sb.append(getReduceTSC().toString());
+      sb.append("-------------\n");
+
+      sb.append("Job info\n");
+      sb.append(
+        String.format(
+          "Number of Waiting Jobs: %d\n",
+          this.getNumOfWaitingJobs()));
+      sb.append(
+        String.format(
+          "Number of users who have submitted jobs: %d\n",
+          getNumJobsByUser().size()));
+      return sb.toString();
+    }
+
+  String getQueueName() {
+    return queueName;
+  }
+
+  void setQueueName(String queueName) {
+    this.queueName = queueName;
+  }
+
+  int getMapCapacity() {
+    return mapCapacity;
+  }
+
+  void setMapCapacity(int mapCapacity) {
+    this.mapCapacity = mapCapacity;
+  }
+
+  int getReduceCapacity() {
+    return reduceCapacity;
+  }
+
+  void setReduceCapacity(int reduceCapacity) {
+    this.reduceCapacity = reduceCapacity;
+  }
+
+  float getCapacityPercent() {
+    return capacityPercent;
+  }
+
+  void setCapacityPercent(float capacityPercent) {
+    this.capacityPercent = capacityPercent;
+  }
+
+  Map<String, Integer> getNumJobsByUser() {
+    return numJobsByUser;
+  }
+
+  void setNumJobsByUser(Map<String, Integer> numJobsByUser) {
+    this.numJobsByUser = numJobsByUser;
+  }
+
+  int getUlMin() {
+    return ulMin;
+  }
+
+  void setUlMin(int ulMin) {
+    this.ulMin = ulMin;
+  }
+
+  TaskSchedulingContext getMapTSC() {
+    return mapTSC;
+  }
+
+  void setMapTSC(TaskSchedulingContext mapTSC) {
+    this.mapTSC = mapTSC;
+  }
+
+  TaskSchedulingContext getReduceTSC() {
+    return reduceTSC;
+  }
+
+  void setReduceTSC(TaskSchedulingContext reduceTSC) {
+    this.reduceTSC = reduceTSC;
+  }
+
+  boolean supportsPriorities() {
+    return supportsPriorities;
+  }
+
+  void setSupportsPriorities(boolean supportsPriorities) {
+    this.supportsPriorities = supportsPriorities;
+  }
+
+  int getNumOfWaitingJobs() {
+    return numOfWaitingJobs;
+  }
+
+  void setNumOfWaitingJobs(int numOfWaitingJobs) {
+    this.numOfWaitingJobs = numOfWaitingJobs;
+  }
+
+  float getMaxCapacityPercent() {
+    return maxCapacityPercent;
+  }
+
+  void setMaxCapacityPercent(float maxCapacityPercent) {
+    this.maxCapacityPercent = maxCapacityPercent;
+  }
+
+
+  /**
+   * This method is useful to get a cumulative data.
+   * Adds the @param qcontext data to the this queueschedulingContext data.
+   * @param qcontext
+   * @return
+   */
+  void add(QueueSchedulingContext qcontext) {
+    this.setNumOfWaitingJobs(numOfWaitingJobs+qcontext.getNumOfWaitingJobs());
+    this.getMapTSC().update(qcontext.getMapTSC());
+    this.getReduceTSC().update(qcontext.getReduceTSC());
+  }
+
+  void updateContext(int mapClusterCapacity , int reduceClusterCapacity) {
+    setMapCapacity(mapClusterCapacity);
+    setReduceCapacity(reduceClusterCapacity);
+    // if # of slots have changed since last time, update.
+    // First, compute whether the total number of TT slots have changed
+    // compute new capacities, if TT slots have changed
+    if (getMapCapacity() != prevMapCapacity) {
+      getMapTSC().setCapacity(
+        (int)
+          (getCapacityPercent() * getMapCapacity() / 100));
+
+      //Check if max capacity percent is set for this queue.
+      //if yes then set the maxcapacity for this queue.
+      if (getMaxCapacityPercent() > 0) {
+        getMapTSC().setMaxCapacity(
+          (int) (getMaxCapacityPercent() * getMapCapacity() /
+            100)
+        );
+      }
+    }
+
+    //REDUCES
+    if (getReduceCapacity() != prevReduceCapacity) {
+      getReduceTSC().setCapacity(
+        (int)
+          (getCapacityPercent() * getReduceCapacity() / 100));
+
+      //set stretch capacity for reduce
+      //check if max capacity percent is set for this QueueSchedulingContext.
+      //if yes then set the maxCapacity for this JobQueue.
+      if (getMaxCapacityPercent() > 0) {
+        getReduceTSC().setMaxCapacity(
+          (int) (getMaxCapacityPercent() * getReduceCapacity() /
+            100));
+      }
+    }
+
+    // reset running/pending tasks, tasks per user
+    getMapTSC().resetTaskVars();
+    getReduceTSC().resetTaskVars();
+    // update stats on running jobs
+    prevMapCapacity = getMapCapacity();
+    prevReduceCapacity = getReduceCapacity();
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskDataView.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskDataView.java?rev=808308&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskDataView.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskDataView.java Thu Aug 27 07:49:00 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+* <p/>
+* http://www.apache.org/licenses/LICENSE-2.0
+* <p/>
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Task view class for the job .
+ * returns the running pending and other information for a Job(JobInProgress).
+ *
+ * has a factory method which provides
+ * map and reduce data view based on the type
+ *
+ */
+abstract class TaskDataView {
+  TaskType type;
+
+  abstract int getRunningTasks(JobInProgress job);
+
+  abstract int getPendingTasks(JobInProgress job);
+
+  abstract int getSlotsPerTask(JobInProgress job);
+
+  abstract TaskSchedulingContext getTSI(QueueSchedulingContext qsi);
+
+  abstract int getNumReservedTaskTrackers(JobInProgress job);
+
+  int getSlotsOccupied(JobInProgress job) {
+    return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) *
+      getSlotsPerTask(job);
+  }
+
+  /**
+   * Check if the given job has sufficient reserved tasktrackers for all its
+   * pending tasks.
+   *
+   * @param job job to check for sufficient reserved tasktrackers
+   * @return <code>true</code> if the job has reserved tasktrackers,
+   *         else <code>false</code>
+   */
+  boolean hasSufficientReservedTaskTrackers(JobInProgress job) {
+    return getNumReservedTaskTrackers(job) >= getPendingTasks(job);
+  }
+
+  private static TaskDataView mapTaskDataView;
+  private static TaskDataView reduceTaskDataView;
+
+  static TaskDataView getTaskDataView(TaskType type) {
+     if(type == TaskType.MAP) {
+       if(mapTaskDataView == null) {
+         mapTaskDataView = new MapTaskDataView();
+       }
+       return mapTaskDataView;
+     }else if(type == TaskType.REDUCE) {
+       if(reduceTaskDataView == null) {
+         reduceTaskDataView = new ReduceTaskDataView();
+       }
+       return reduceTaskDataView;
+     }
+    return null;
+  }
+
+  /**
+   * The data view for map tasks
+   */
+  static class MapTaskDataView extends TaskDataView {
+    TaskType type;
+
+    MapTaskDataView() {
+      type = TaskType.MAP;
+    }
+
+    @Override
+    int getRunningTasks(JobInProgress job) {
+      return job.runningMaps();
+    }
+
+    @Override
+    int getPendingTasks(JobInProgress job) {
+      return job.pendingMaps();
+    }
+
+    @Override
+    int getSlotsPerTask(JobInProgress job) {
+      return
+        job.getJobConf().computeNumSlotsPerMap(
+          MemoryMatcher.getMemSizeForMapSlot());
+    }
+
+    @Override
+    TaskSchedulingContext getTSI(QueueSchedulingContext qsi) {
+      return qsi.getMapTSC();
+    }
+
+    int getNumReservedTaskTrackers(JobInProgress job) {
+      return job.getNumReservedTaskTrackersForMaps();
+    }
+
+  }
+
+  /**
+   *  The data view for reduce tasks
+   */
+  static class ReduceTaskDataView extends TaskDataView {
+    TaskType type;
+
+    ReduceTaskDataView() {
+      type = TaskType.REDUCE;
+    }
+
+    @Override
+    int getRunningTasks(JobInProgress job) {
+      return job.runningReduces();
+    }
+
+    @Override
+    int getPendingTasks(JobInProgress job) {
+      return job.pendingReduces();
+    }
+
+    @Override
+    int getSlotsPerTask(JobInProgress job) {
+      return
+        job.getJobConf().
+          computeNumSlotsPerReduce(MemoryMatcher.getMemSizeForReduceSlot());
+    }
+
+    @Override
+    TaskSchedulingContext getTSI(QueueSchedulingContext qsi) {
+      return qsi.getReduceTSC();
+    }
+
+    int getNumReservedTaskTrackers(JobInProgress job) {
+      return job.getNumReservedTaskTrackersForReduces();
+    }
+
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java?rev=808308&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java Thu Aug 27 07:49:00 2009
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+
+/**
+ * ********************************************************************
+ * Keeping track of scheduling information for queues
+ * <p/>
+ * Maintain information specific to
+ * each kind of task, Map or Reduce (num of running tasks, pending
+ * tasks etc).
+ * <p/>
+ * This scheduling information is used to decide how to allocate
+ * tasks, redistribute capacity, etc.
+ * <p/>
+ * A TaskSchedulingContext (TSI) object represents scheduling
+ * information for a particular kind of task (Map or Reduce).
+ * <p/>
+ * ********************************************************************
+ */
+public class TaskSchedulingContext {
+
+  private TaskType type;
+  private static final String LIMIT_NORMALIZED_CAPACITY_STRING
+    = "(Capacity is restricted to max limit of %d slots.\n" +
+    "Remaining %d slots will be used by other queues.)\n";
+  /**
+   * the actual capacity, which depends on how many slots are available
+   * in the cluster at any given time.
+   */
+  private int capacity = 0;
+  // number of running tasks
+  private int numRunningTasks = 0;
+  // number of slots occupied by running tasks
+  private int numSlotsOccupied = 0;
+
+  //the actual capacity stretch which depends on how many slots are available
+  //in cluster at any given time.
+  private int maxCapacity = -1;
+
+  /**
+   * max task limit
+   * This value is the maximum slots that can be used in a
+   * queue at any point of time. So for example assuming above config value
+   * is 100 , not more than 100 tasks would be in the queue at any point of
+   * time, assuming each task takes one slot.
+   */
+  private int maxTaskLimit = -1;
+
+  /**
+   * for each user, we need to keep track of number of slots occupied by
+   * running tasks
+   */
+  private Map<String, Integer> numSlotsOccupiedByUser =
+    new HashMap<String, Integer>();
+  final static String JOB_SCHEDULING_INFO_FORMAT_STRING =
+    "%s running map tasks using %d map slots. %d additional slots reserved." +
+      " %s running reduce tasks using %d reduce slots." +
+      " %d additional slots reserved.";
+
+  public TaskSchedulingContext(TaskType type) {
+    this.type = type;
+  }
+
+  /**
+   * reset the variables associated with tasks
+   */
+  void resetTaskVars() {
+    setNumRunningTasks(0);
+    setNumSlotsOccupied(0);
+    for (String s : getNumSlotsOccupiedByUser().keySet()) {
+      getNumSlotsOccupiedByUser().put(s, Integer.valueOf(0));
+    }
+  }
+
+
+  int getMaxTaskLimit() {
+    return maxTaskLimit;
+  }
+
+  void setMaxTaskLimit(int maxTaskCap) {
+    this.maxTaskLimit = maxTaskCap;
+  }
+
+  /**
+   * This method checks for maxfinalLimit and
+   * sends minimum of maxTaskLimit and capacity.
+   *
+   * @return
+   */
+  int getCapacity() {
+    if ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) {
+      return maxTaskLimit;
+    }
+    return capacity;
+  }
+
+  /**
+   * checks if current capacity reached its maximum capacity %
+   *
+   * @return
+   */
+  boolean checkIfReachedMax() {
+    if (maxCapacity < 0) {
+      return false;
+    }
+    return (maxCapacity <= numSlotsOccupied);
+  }
+
+  /**
+   * Mutator method for capacity
+   *
+   * @param capacity
+   */
+  void setCapacity(int capacity) {
+    this.capacity = capacity;
+  }
+
+
+  /**
+   * return information about the tasks
+   */
+  @Override
+  public String toString() {
+    float occupiedSlotsAsPercent =
+      getCapacity() != 0 ?
+        ((float) getNumSlotsOccupied() * 100 / getCapacity()) : 0;
+    StringBuffer sb = new StringBuffer();
+
+    sb.append("Capacity: " + getCapacity() + " slots\n");
+    //If maxTaskLimit is less than the capacity
+    if (getMaxTaskLimit() >= 0 && getMaxTaskLimit() < getCapacity()) {
+      sb.append(
+        String.format(
+          LIMIT_NORMALIZED_CAPACITY_STRING,
+          getMaxTaskLimit(), (getCapacity() - getMaxTaskLimit())));
+    }
+    if (getMaxTaskLimit() >= 0) {
+      sb.append(String.format("Maximum Slots Limit: %d\n", getMaxTaskLimit()));
+    }
+    sb.append(
+      String.format(
+        "Used capacity: %d (%.1f%% of Capacity)\n",
+        Integer.valueOf(getNumSlotsOccupied()), Float
+          .valueOf(occupiedSlotsAsPercent)));
+    sb.append(
+      String.format(
+        "Running tasks: %d\n", Integer
+          .valueOf(getNumRunningTasks())));
+    // include info on active users
+    if (getNumSlotsOccupied() != 0) {
+      sb.append("Active users:\n");
+      for (Map.Entry<String, Integer> entry : getNumSlotsOccupiedByUser()
+        .entrySet()) {
+        if ((entry.getValue() == null) ||
+          (entry.getValue().intValue() <= 0)) {
+          // user has no tasks running
+          continue;
+        }
+        sb.append("User '" + entry.getKey() + "': ");
+        int numSlotsOccupiedByThisUser = entry.getValue().intValue();
+        float p =
+          (float) numSlotsOccupiedByThisUser * 100 / getNumSlotsOccupied();
+        sb.append(
+          String.format(
+            "%d (%.1f%% of used capacity)\n", Long
+              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
+      }
+    }
+    return sb.toString();
+  }
+
+  int getNumRunningTasks() {
+    return numRunningTasks;
+  }
+
+  void setNumRunningTasks(int numRunningTasks) {
+    this.numRunningTasks = numRunningTasks;
+  }
+
+  int getNumSlotsOccupied() {
+    return numSlotsOccupied;
+  }
+
+  void setNumSlotsOccupied(int numSlotsOccupied) {
+    this.numSlotsOccupied = numSlotsOccupied;
+  }
+
+  Map<String, Integer> getNumSlotsOccupiedByUser() {
+    return numSlotsOccupiedByUser;
+  }
+
+  void setNumSlotsOccupiedByUser(
+    Map<String, Integer> numSlotsOccupiedByUser) {
+    this.numSlotsOccupiedByUser = numSlotsOccupiedByUser;
+  }
+
+  int getMaxCapacity() {
+    return maxCapacity;
+  }
+
+  void setMaxCapacity(int maxCapacity) {
+    this.maxCapacity = maxCapacity;
+  }
+
+  void update(TaskSchedulingContext tc) {
+    this.numSlotsOccupied += tc.numSlotsOccupied;
+    this.numRunningTasks += tc.numRunningTasks;
+    //this.maxTaskLimit += tc.maxTaskLimit;
+    updateNoOfSlotsOccupiedByUser(tc.numSlotsOccupiedByUser);
+  }
+
+  private void updateNoOfSlotsOccupiedByUser(Map<String, Integer> nou) {
+    Set<String> keys = nou.keySet();
+    for (String key : keys) {
+      if (this.numSlotsOccupiedByUser.containsKey(key)) {
+        int currentVal = this.numSlotsOccupiedByUser.get(key);
+        this.numSlotsOccupiedByUser.put(key, currentVal + nou.get(key));
+      }
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message