hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1095212 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/u...
Date Tue, 19 Apr 2011 21:00:46 GMT
Author: acmurthy
Date: Tue Apr 19 21:00:46 2011
New Revision: 1095212

URL: http://svn.apache.org/viewvc?rev=1095212&view=rev
Log:
Added functionality to stop/start queues. 

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue Apr 19 21:00:46 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
     
+    Added functionality to stop/start queues. (acmurthy)
+    
     Added functionality to refresh queues at runtime via the 'bin/yarn
     rmadmin' command. (acmurthy) 
     

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
Tue Apr 19 21:00:46 2011
@@ -20,4 +20,7 @@ public interface QueueInfo {
   
   List<Application> getApplications();
   void setApplications(List<Application> applications);
+  
+  QueueState getQueueState();
+  void setQueueState(QueueState queueState);
 }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java?rev=1095212&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
Tue Apr 19 21:00:46 2011
@@ -0,0 +1,9 @@
+package org.apache.hadoop.yarn.api.records;
+
+/**
+ * State of a Queue
+ */
+public enum QueueState {
+  STOPPED, 
+  RUNNING
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
Tue Apr 19 21:00:46 2011
@@ -7,9 +7,13 @@ import java.util.List;
 import org.apache.hadoop.yarn.api.records.Application;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 
 public class QueueInfoPBImpl extends ProtoBase<QueueInfoProto> implements
     QueueInfo {
@@ -67,6 +71,15 @@ public class QueueInfoPBImpl extends Pro
   }
 
   @Override
+  public QueueState getQueueState() {
+    QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasState()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getState());
+  }
+
+  @Override
   public void setApplications(List<Application> applications) {
     if (applications == null) {
       builder.clearApplications();
@@ -110,6 +123,16 @@ public class QueueInfoPBImpl extends Pro
   }
 
   @Override
+  public void setQueueState(QueueState queueState) {
+    maybeInitBuilder();
+    if (queueState == null) {
+      builder.clearState();
+      return;
+    }
+    builder.setState(convertToProtoFormat(queueState));
+  }
+
+  @Override
   public QueueInfoProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -252,4 +275,12 @@ public class QueueInfoPBImpl extends Pro
     return ((QueueInfoPBImpl)q).getProto();
   }
 
+  private QueueState convertFromProtoFormat(QueueStateProto q) {
+    return ProtoUtils.convertFromProtoFormat(q);
+  }
+  
+  private QueueStateProto convertToProtoFormat(QueueState queueState) {
+    return ProtoUtils.convertToProtoFormat(queueState);
+  }
+
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
Tue Apr 19 21:00:46 2011
@@ -6,11 +6,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.YarnContainerTags;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.YarnContainerTagsProto;
 
 import com.google.protobuf.ByteString;
@@ -89,4 +91,15 @@ public class ProtoUtils {
     return bs;
   }
   
+  /*
+   * QueueState
+   */
+  private static String QUEUE_STATE_PREFIX = "Q_";
+  public static QueueStateProto convertToProtoFormat(QueueState e) {
+    return QueueStateProto.valueOf(QUEUE_STATE_PREFIX + e.name());
+  }
+  public static QueueState convertFromProtoFormat(QueueStateProto e) {
+    return QueueState.valueOf(e.name().replace(QUEUE_STATE_PREFIX, ""));
+  }
+
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto Tue Apr
19 21:00:46 2011
@@ -162,13 +162,19 @@ message YarnClusterMetricsProto {
   optional int32 num_node_managers = 1;
 }
 
+enum QueueStateProto {
+  Q_STOPPED = 1;
+  Q_RUNNING = 2;
+}
+
 message QueueInfoProto {
   optional string queueName = 1;
   optional float capacity = 2;
   optional float maximumCapacity = 3;
   optional float currentCapacity = 4;
-  repeated QueueInfoProto childQueues = 5;
-  repeated ApplicationProto applications = 6;
+  optional QueueStateProto state = 5;
+  repeated QueueInfoProto childQueues = 6;
+  repeated ApplicationProto applications = 7;
 }
 
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
Tue Apr 19 21:00:46 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 
 public class CapacitySchedulerConfiguration extends Configuration {
@@ -56,7 +57,10 @@ public class CapacitySchedulerConfigurat
   
   @Private
   public static final String USER_LIMIT_FACTOR = "user-limit-factor";
-  
+
+  @Private
+  public static final String STATE = "state";
+
   private static final int MINIMUM_MEMORY = 1024;
 
   @Private
@@ -135,6 +139,11 @@ public class CapacitySchedulerConfigurat
     setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor); 
   }
   
+  public QueueState getState(String queue) {
+    String state = get(getQueuePrefix(queue) + STATE);
+    return (state != null) ? QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
+  }
+
   public void setCapacity(String queue, int capacity) {
     setInt(getQueuePrefix(queue) + CAPACITY, capacity);
     LOG.info("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) + 
@@ -160,4 +169,5 @@ public class CapacitySchedulerConfigurat
     return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
              createResource(minimumMemory);
   }
+  
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
Tue Apr 19 21:00:46 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -85,6 +86,8 @@ public class LeafQueue implements Queue 
   private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application> 
   applicationInfos;
   
+  private QueueState state;
+  
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
@@ -123,10 +126,13 @@ public class LeafQueue implements Queue 
       new HashMap<ApplicationId, 
       org.apache.hadoop.yarn.api.records.Application>();
 
+    QueueState state = cs.getConfiguration().getState(getQueuePath());
+    
     setupQueueConfigs(capacity, absoluteCapacity, 
         maximumCapacity, absoluteMaxCapacity, 
         userLimit, userLimitFactor, 
-        maxApplications, maxApplicationsPerUser);
+        maxApplications, maxApplicationsPerUser,
+        state);
 
     LOG.info("DEBUG --- LeafQueue:" +
     		" name=" + queueName + 
@@ -139,7 +145,8 @@ public class LeafQueue implements Queue 
           float capacity, float absoluteCapacity, 
           float maxCapacity, float absoluteMaxCapacity,
           int userLimit, float userLimitFactor,
-          int maxApplications, int maxApplicationsPerUser)
+          int maxApplications, int maxApplicationsPerUser,
+          QueueState state)
   {
     this.capacity = capacity; 
     this.absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
@@ -152,9 +159,12 @@ public class LeafQueue implements Queue 
 
     this.maxApplications = maxApplications;
     this.maxApplicationsPerUser = maxApplicationsPerUser;
+
+    this.state = state;
     
     this.queueInfo.setCapacity(capacity);
     this.queueInfo.setMaximumCapacity(maximumCapacity);
+    this.queueInfo.setQueueState(state);
     
     LOG.info(queueName +
         ", capacity=" + capacity + 
@@ -163,7 +173,8 @@ public class LeafQueue implements Queue 
         ", asboluteMaxCapacity=" + absoluteMaxCapacity +
         ", userLimit=" + userLimit + ", userLimitFactor=" + userLimitFactor + 
         ", maxApplications=" + maxApplications + 
-        ", maxApplicationsPerUser=" + maxApplicationsPerUser);
+        ", maxApplicationsPerUser=" + maxApplicationsPerUser + 
+        ", state=" + state);
   }
       
 
@@ -244,6 +255,11 @@ public class LeafQueue implements Queue 
   }
 
   @Override
+  public QueueState getState() {
+    return state;
+  }
+
+  @Override
   public synchronized QueueInfo getQueueInfo(boolean includeApplications, 
       boolean includeChildQueues, boolean recursive) {
     queueInfo.setCurrentCapacity(usedCapacity);
@@ -289,7 +305,8 @@ public class LeafQueue implements Queue 
     setupQueueConfigs(leafQueue.capacity, leafQueue.absoluteCapacity, 
         leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, 
         leafQueue.userLimit, leafQueue.userLimitFactor, 
-        leafQueue.maxApplications, leafQueue.maxApplicationsPerUser);
+        leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
+        leafQueue.state);
     
     update(clusterResource);
   }
@@ -299,8 +316,16 @@ public class LeafQueue implements Queue 
       String queue, Priority priority) 
   throws AccessControlException {
     // Careful! Locking order is important!
+    User user = null;
+    
     synchronized (this) {
       
+      if (state != QueueState.RUNNING) {
+        throw new AccessControlException("Queue " + getQueuePath() +
+            " is STOPPED. Cannot accept submission of application: " +
+            application.getApplicationId());
+      }
+      
       // Check submission limits for queues
       if (getNumApplications() >= maxApplications) {
         throw new AccessControlException("Queue " + getQueuePath() + 
@@ -310,7 +335,7 @@ public class LeafQueue implements Queue 
       }
 
       // Check submission limits for the user on this queue
-      User user = getUser(userName);
+      user = getUser(userName);
       if (user.getApplications() >= maxApplicationsPerUser) {
         throw new AccessControlException("Queue " + getQueuePath() + 
             " already has " + user.getApplications() + 
@@ -318,51 +343,67 @@ public class LeafQueue implements Queue 
             " cannot accept submission of application: " + 
             application.getApplicationId());
       }
-      
-      // Accept 
-      user.submitApplication();
-      applications.add(application);
-      applicationInfos.put(application.getApplicationId(), 
-          application.getApplicationInfo());
-
-      LOG.info("Application submission -" +
-          " appId: " + application.getApplicationId() +
-          " user: " + user + "," + " leaf-queue: " + getQueueName() +
-          " #user-applications: " + user.getApplications() + 
-          " #queue-applications: " + getNumApplications());
+
+      // Add the application to our data-structures
+      addApplication(application, user);
     }
 
     // Inform the parent queue
-    parent.submitApplication(application, userName, queue, priority);
+    try {
+      parent.submitApplication(application, userName, queue, priority);
+    } catch (AccessControlException ace) {
+      LOG.info("Failed to submit application to parent-queue: " + 
+          parent.getQueuePath(), ace);
+      removeApplication(application, user);
+      throw ace;
+    }
   }
 
+  private synchronized void addApplication(Application application, User user) {
+    // Accept 
+    user.submitApplication();
+    applications.add(application);
+    applicationInfos.put(application.getApplicationId(), 
+        application.getApplicationInfo());
+
+    LOG.info("Application added -" +
+        " appId: " + application.getApplicationId() +
+        " user: " + user + "," + " leaf-queue: " + getQueueName() +
+        " #user-applications: " + user.getApplications() + 
+        " #queue-applications: " + getNumApplications());
+
+  }
+  
   @Override
   public void finishApplication(Application application, String queue) 
   throws AccessControlException {
     // Careful! Locking order is important!
     synchronized (this) {
-      applications.remove(application);
-      
-      User user = getUser(application.getUser());
-      user.finishApplication();
-      if (user.getApplications() == 0) {
-        users.remove(application.getUser());
-      }
-      
-      applicationInfos.remove(application.getApplicationId());
-
-      LOG.info("Application completion -" +
-          " appId: " + application.getApplicationId() + 
-          " user: " + application.getUser() + 
-          " queue: " + getQueueName() +
-          " #user-applications: " + user.getApplications() + 
-          " #queue-applications: " + getNumApplications());
+      removeApplication(application, getUser(application.getUser()));
     }
     
     // Inform the parent queue
     parent.finishApplication(application, queue);
   }
   
+  public synchronized void removeApplication(Application application, User user) {
+    applications.remove(application);
+    
+    user.finishApplication();
+    if (user.getApplications() == 0) {
+      users.remove(application.getUser());
+    }
+    
+    applicationInfos.remove(application.getApplicationId());
+
+    LOG.info("Application removed -" +
+        " appId: " + application.getApplicationId() + 
+        " user: " + application.getUser() + 
+        " queue: " + getQueueName() +
+        " #user-applications: " + user.getApplications() + 
+        " #queue-applications: " + getNumApplications());
+  }
+  
   @Override
   public synchronized Resource 
   assignContainers(Resource clusterResource, NodeManager node) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
Tue Apr 19 21:00:46 2011
@@ -38,12 +38,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
 
 @Private
 @Evolving
@@ -75,6 +77,8 @@ public class ParentQueue implements Queu
   private volatile int numApplications;
   private volatile int numContainers;
 
+  private QueueState state;
+  
   private QueueInfo queueInfo; 
   private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application> 
   applicationInfos;
@@ -103,12 +107,14 @@ public class ParentQueue implements Queu
       (maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ? 
           Float.MAX_VALUE :  (parentAbsoluteCapacity * maximumCapacity) / 100;
     
+    QueueState state = cs.getConfiguration().getState(getQueuePath());
+    
     this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
     this.queueInfo.setQueueName(queueName);
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
 
     setupQueueConfigs(capacity, absoluteCapacity, 
-        maximumCapacity, absoluteMaxCapacity);
+        maximumCapacity, absoluteMaxCapacity, state);
     
     this.queueComparator = comparator;
     this.childQueues = new TreeSet<Queue>(comparator);
@@ -125,21 +131,26 @@ public class ParentQueue implements Queu
 
   private synchronized void setupQueueConfigs(
           float capacity, float absoluteCapacity, 
-          float maximumCapacity, float absoluteMaxCapacity
+          float maximumCapacity, float absoluteMaxCapacity,
+          QueueState state
   ) {
     this.capacity = capacity;
     this.absoluteCapacity = absoluteCapacity;
     this.maximumCapacity = maximumCapacity;
     this.absoluteMaxCapacity = absoluteMaxCapacity;
 
+    this.state = state;
+    
     this.queueInfo.setCapacity(capacity);
     this.queueInfo.setMaximumCapacity(maximumCapacity);
+    this.queueInfo.setQueueState(state);
 
     LOG.info(queueName +
         ", capacity=" + capacity +
         ", asboluteCapacity=" + absoluteCapacity +
         ", maxCapacity=" + maximumCapacity +
-        ", asboluteMaxCapacity=" + absoluteMaxCapacity);
+        ", asboluteMaxCapacity=" + absoluteMaxCapacity + 
+        ", state=" + state);
   }
 
   private static float PRECISION = 0.005f; // 0.05% precision
@@ -234,6 +245,11 @@ public class ParentQueue implements Queu
   }
 
   @Override
+  public QueueState getState() {
+    return state;
+  }
+
+  @Override
   public synchronized QueueInfo getQueueInfo(boolean includeApplications, 
       boolean includeChildQueues, boolean recursive) {
     queueInfo.setCurrentCapacity(usedCapacity);
@@ -303,7 +319,8 @@ public class ParentQueue implements Queu
 
     // Set new configs
     setupQueueConfigs(parentQueue.capacity, parentQueue.absoluteCapacity,
-        parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity);
+        parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
+        parentQueue.state);
 
     // Update
     update(clusterResource);
@@ -321,53 +338,84 @@ public class ParentQueue implements Queu
   public void submitApplication(Application application, String user,
       String queue, Priority priority) 
   throws AccessControlException {
-    // Sanity check
-    if (queue.equals(queueName)) {
-      throw new AccessControlException("Cannot submit application " +
-          "to non-leaf queue: " + queueName);
+    
+    synchronized (this) {
+      // Sanity check
+      if (queue.equals(queueName)) {
+        throw new AccessControlException("Cannot submit application " +
+            "to non-leaf queue: " + queueName);
+      }
+      
+      if (state != QueueState.RUNNING) {
+        throw new AccessControlException("Queue " + getQueuePath() +
+            " is STOPPED. Cannot accept submission of application: " +
+            application.getApplicationId());
+      }
+
+      addApplication(application, user);
     }
     
+    // Inform the parent queue
+    if (parent != null) {
+      try {
+        parent.submitApplication(application, user, queue, priority);
+      } catch (AccessControlException ace) {
+        LOG.info("Failed to submit application to parent-queue: " + 
+            parent.getQueuePath(), ace);
+        removeApplication(application, user);
+        throw ace;
+      }
+    }
+  }
+
+  private synchronized void addApplication(Application application, 
+      String user) {
+  
     ++numApplications;
-   
+
     applicationInfos.put(application.getApplicationId(), 
         application.getApplicationInfo());
 
-    LOG.info("Application submission -" +
-    		" appId: " + application.getApplicationId() + 
+    LOG.info("Application added -" +
+        " appId: " + application.getApplicationId() + 
         " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());
+  }
+  
+  @Override
+  public void finishApplication(Application application, String queue) 
+  throws AccessControlException {
+    
+    synchronized (this) {
+      // Sanity check
+      if (queue.equals(queueName)) {
+        throw new AccessControlException("Cannot finish application " +
+            "from non-leaf queue: " + queueName);
+      }
 
+      removeApplication(application, application.getUser());
+    }
+    
     // Inform the parent queue
     if (parent != null) {
-      parent.submitApplication(application, user, queue, priority);
+      parent.finishApplication(application, queue);
     }
   }
 
-  @Override
-  public void finishApplication(Application application, String queue) 
-  throws AccessControlException {
-    // Sanity check
-    if (queue.equals(queueName)) {
-      throw new AccessControlException("Cannot finish application " +
-          "from non-leaf queue: " + queueName);
-    }
+  public synchronized void removeApplication(Application application, 
+      String user) {
     
     --numApplications;
     applicationInfos.remove(application.getApplicationId());
 
-    LOG.info("Application completion -" +
+    LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
-        " user: " + application.getUser() + 
+        " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());
-
-    // Inform the parent queue
-    if (parent != null) {
-      parent.finishApplication(application, queue);
-    }
   }
-
+  
   synchronized void setUsedCapacity(float usedCapacity) {
     this.usedCapacity = usedCapacity;
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
Tue Apr 19 21:00:46 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
@@ -109,6 +110,12 @@ extends org.apache.hadoop.yarn.server.re
   public float getUtilization();
   
   /**
+   * Get the current run-state of the queue
+   * @return current run-state
+   */
+  public QueueState getState();
+  
+  /**
    * Get child queues
    * @return child queues
    */

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml?rev=1095212&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
Tue Apr 19 21:00:46 2011
@@ -0,0 +1,38 @@
+<configuration>
+
+  <property>
+    <name>yarn.capacity-scheduler.maximum-applications</name>
+    <value>10000</value>
+  </property>
+
+  <property>
+    <name>yarn.capacity-scheduler.root.queues</name>
+    <value>default</value>
+  </property>
+
+  <property>
+    <name>yarn.capacity-scheduler.root.capacity</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <name>yarn.capacity-scheduler.root.default.capacity</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <name>yarn.capacity-scheduler.root.default.user-limit-factor</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <name>yarn.capacity-scheduler.root.default.maximum-capacity</name>
+    <value>-1</value>
+  </property>
+
+  <property>
+    <name>yarn.capacity-scheduler.root.default.state</name>
+    <value>RUNNING</value>
+  </property>
+
+</configuration>



Mime
View raw message