hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject svn commit: r1565515 [2/3] - in /hadoop/common/branches/HDFS-5698/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop...
Date Fri, 07 Feb 2014 01:57:30 GMT
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java Fri Feb  7 01:57:21 2014
@@ -40,7 +40,7 @@ public class TestApplicationHistoryServe
     Configuration config = new YarnConfiguration();
     historyServer.init(config);
     assertEquals(STATE.INITED, historyServer.getServiceState());
-    assertEquals(2, historyServer.getServices().size());
+    assertEquals(3, historyServer.getServices().size());
     ApplicationHistoryClientService historyService =
         historyServer.getClientService();
     assertNotNull(historyServer.getClientService());

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java Fri Feb  7 01:57:21 2014
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.Text;
@@ -312,7 +313,7 @@ public class BuilderUtils {
       String url, long startTime, long finishTime,
       FinalApplicationStatus finalStatus,
       ApplicationResourceUsageReport appResources, String origTrackingUrl,
-      float progress, String appType, Token amRmToken) {
+      float progress, String appType, Token amRmToken, Set<String> tags) {
     ApplicationReport report = recordFactory
         .newRecordInstance(ApplicationReport.class);
     report.setApplicationId(applicationId);
@@ -334,6 +335,7 @@ public class BuilderUtils {
     report.setProgress(progress);
     report.setApplicationType(appType);
     report.setAMRMToken(amRmToken);
+    report.setApplicationTags(tags);
     return report;
   }
   

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Fri Feb  7 01:57:21 2014
@@ -45,8 +45,11 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -72,6 +75,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 
 public class AdminService extends CompositeService implements
@@ -89,6 +93,8 @@ public class AdminService extends Compos
   private InetSocketAddress masterServiceAddress;
   private AccessControlList adminAcl;
   
+  private ConfigurationProvider configurationProvider = null;
+
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
@@ -109,6 +115,10 @@ public class AdminService extends Compos
       }
     }
 
+    this.configurationProvider =
+        ConfigurationProviderFactory.getConfigurationProvider(conf);
+    configurationProvider.init(conf);
+
     masterServiceAddress = conf.getSocketAddr(
         YarnConfiguration.RM_ADMIN_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@@ -129,6 +139,9 @@ public class AdminService extends Compos
   @Override
   protected synchronized void serviceStop() throws Exception {
     stopServer();
+    if (this.configurationProvider != null) {
+      configurationProvider.close();
+    }
     super.serviceStop();
   }
 
@@ -295,23 +308,28 @@ public class AdminService extends Compos
   @Override
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
       throws YarnException, StandbyException {
-    UserGroupInformation user = checkAcls("refreshQueues");
+    String argName = "refreshQueues";
+    UserGroupInformation user = checkAcls(argName);
 
     if (!isRMActive()) {
-      RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
           adminAcl.toString(), "AdminService",
           "ResourceManager is not active. Can not refresh queues.");
       throwStandbyException();
     }
 
+    RefreshQueuesResponse response =
+        recordFactory.newRecordInstance(RefreshQueuesResponse.class);
     try {
-      rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
-      RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", 
+      Configuration conf =
+          getConfiguration(YarnConfiguration.CS_CONFIGURATION_FILE);
+      rmContext.getScheduler().reinitialize(conf, this.rmContext);
+      RMAuditLogger.logSuccess(user.getShortUserName(), argName,
           "AdminService");
-      return recordFactory.newRecordInstance(RefreshQueuesResponse.class);
+      return response;
     } catch (IOException ioe) {
       LOG.info("Exception refreshing queues ", ioe);
-      RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
           adminAcl.toString(), "AdminService",
           "Exception refreshing queues");
       throw RPCUtil.getRemoteException(ioe);
@@ -346,21 +364,22 @@ public class AdminService extends Compos
   @Override
   public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
       RefreshSuperUserGroupsConfigurationRequest request)
-      throws YarnException, StandbyException {
-    UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration");
+      throws YarnException, IOException {
+    String argName = "refreshSuperUserGroupsConfiguration";
+    UserGroupInformation user = checkAcls(argName);
 
-    // TODO (YARN-1459): Revisit handling super-user-groups on Standby RM
     if (!isRMActive()) {
-      RMAuditLogger.logFailure(user.getShortUserName(),
-          "refreshSuperUserGroupsConfiguration",
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
           adminAcl.toString(), "AdminService",
           "ResourceManager is not active. Can not refresh super-user-groups.");
       throwStandbyException();
     }
 
-    ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration());
+    Configuration conf =
+        getConfiguration(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
+    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
     RMAuditLogger.logSuccess(user.getShortUserName(),
-        "refreshSuperUserGroupsConfiguration", "AdminService");
+        argName, "AdminService");
     
     return recordFactory.newRecordInstance(
         RefreshSuperUserGroupsConfigurationResponse.class);
@@ -391,14 +410,22 @@ public class AdminService extends Compos
 
   @Override
   public RefreshAdminAclsResponse refreshAdminAcls(
-      RefreshAdminAclsRequest request) throws YarnException {
-    UserGroupInformation user = checkAcls("refreshAdminAcls");
+      RefreshAdminAclsRequest request) throws YarnException, IOException {
+    String argName = "refreshAdminAcls";
+    UserGroupInformation user = checkAcls(argName);
     
-    Configuration conf = new Configuration();
+    if (!isRMActive()) {
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService",
+          "ResourceManager is not active. Can not refresh user-groups.");
+      throwStandbyException();
+    }
+    Configuration conf =
+        getConfiguration(YarnConfiguration.YARN_SITE_XML_FILE);
     adminAcl = new AccessControlList(conf.get(
         YarnConfiguration.YARN_ADMIN_ACL,
         YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
-    RMAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls", 
+    RMAuditLogger.logSuccess(user.getShortUserName(), argName,
         "AdminService");
 
     return recordFactory.newRecordInstance(RefreshAdminAclsResponse.class);
@@ -406,9 +433,8 @@ public class AdminService extends Compos
 
   @Override
   public RefreshServiceAclsResponse refreshServiceAcls(
-      RefreshServiceAclsRequest request) throws YarnException {
-    Configuration conf = new Configuration();
-    if (!conf.getBoolean(
+      RefreshServiceAclsRequest request) throws YarnException, IOException {
+    if (!getConfig().getBoolean(
              CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
              false)) {
       throw RPCUtil.getRemoteException(
@@ -416,27 +442,38 @@ public class AdminService extends Compos
               CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION + 
               ") not enabled."));
     }
-    
+
+    String argName = "refreshServiceAcls";
+    if (!isRMActive()) {
+      RMAuditLogger.logFailure(UserGroupInformation.getCurrentUser()
+          .getShortUserName(), argName,
+          adminAcl.toString(), "AdminService",
+          "ResourceManager is not active. Can not refresh Service ACLs.");
+      throwStandbyException();
+    }
+
     PolicyProvider policyProvider = new RMPolicyProvider(); 
-    
+    Configuration conf =
+        getConfiguration(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
+
     refreshServiceAcls(conf, policyProvider);
-    if (isRMActive()) {
-      rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
-      rmContext.getApplicationMasterService().refreshServiceAcls(
-          conf, policyProvider);
-      rmContext.getResourceTrackerService().refreshServiceAcls(
-          conf, policyProvider);
-    } else {
-      LOG.warn("ResourceManager is not active. Not refreshing ACLs for " +
-          "Clients, ApplicationMasters and NodeManagers");
-    }
+    rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
+    rmContext.getApplicationMasterService().refreshServiceAcls(
+        conf, policyProvider);
+    rmContext.getResourceTrackerService().refreshServiceAcls(
+        conf, policyProvider);
     
     return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
   }
 
-  void refreshServiceAcls(Configuration configuration, 
+  synchronized void refreshServiceAcls(Configuration configuration,
       PolicyProvider policyProvider) {
-    this.server.refreshServiceAcl(configuration, policyProvider);
+    if (this.configurationProvider instanceof LocalConfigurationProvider) {
+      this.server.refreshServiceAcl(configuration, policyProvider);
+    } else {
+      this.server.refreshServiceAclWithConfigration(configuration,
+          policyProvider);
+    }
   }
 
   @Override
@@ -483,5 +520,19 @@ public class AdminService extends Compos
           UpdateNodeResourceResponse.class);
       return response;
   }
-  
+
+  private synchronized Configuration getConfiguration(String confFileName)
+      throws YarnException, IOException {
+    return this.configurationProvider.getConfiguration(confFileName);
+  }
+
+  @VisibleForTesting
+  public AccessControlList getAccessControlList() {
+    return this.adminAcl;
+  }
+
+  @VisibleForTesting
+  public Server getServer() {
+    return this.server;
+  }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Fri Feb  7 01:57:21 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -86,6 +87,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @SuppressWarnings("unchecked")
 @Private
 public class ApplicationMasterService extends AbstractService implements
@@ -102,6 +105,7 @@ public class ApplicationMasterService ex
   private final AllocateResponse resync =
       recordFactory.newRecordInstance(AllocateResponse.class);
   private final RMContext rmContext;
+  private boolean useLocalConfigurationProvider;
 
   public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
     super(ApplicationMasterService.class.getName());
@@ -112,6 +116,15 @@ public class ApplicationMasterService ex
   }
 
   @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.useLocalConfigurationProvider =
+        (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+            YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+            LocalConfigurationProvider.class)));
+    super.serviceInit(conf);
+  }
+
+  @Override
   protected void serviceStart() throws Exception {
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
@@ -578,7 +591,12 @@ public class ApplicationMasterService ex
 
   public void refreshServiceAcls(Configuration configuration, 
       PolicyProvider policyProvider) {
-    this.server.refreshServiceAcl(configuration, policyProvider);
+    if (this.useLocalConfigurationProvider) {
+      this.server.refreshServiceAcl(configuration, policyProvider);
+    } else {
+      this.server.refreshServiceAclWithConfigration(configuration,
+          policyProvider);
+    }
   }
   
   @Override
@@ -604,4 +622,9 @@ public class ApplicationMasterService ex
       this.response = response;
     }
   }
+
+  @VisibleForTesting
+  public Server getServer() {
+    return this.server;
+  }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Fri Feb  7 01:57:21 2014
@@ -43,7 +43,9 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -94,6 +96,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -104,6 +108,10 @@ import org.apache.hadoop.yarn.server.sec
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+
 
 /**
  * The client interface to the Resource Manager. This module handles all the rpc
@@ -128,6 +136,7 @@ public class ClientRMService extends Abs
 
   private final ApplicationACLsManager applicationsACLsManager;
   private final QueueACLsManager queueACLsManager;
+  private boolean useLocalConfigurationProvider;
 
   public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
       RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
@@ -145,6 +154,10 @@ public class ClientRMService extends Abs
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     clientBindAddress = getBindAddress(conf);
+    this.useLocalConfigurationProvider =
+        (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+            YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+            LocalConfigurationProvider.class)));
     super.serviceInit(conf);
   }
 
@@ -433,9 +446,11 @@ public class ClientRMService extends Abs
         request.getApplicationStates();
     Set<String> users = request.getUsers();
     Set<String> queues = request.getQueues();
+    Set<String> tags = request.getApplicationTags();
     long limit = request.getLimit();
     LongRange start = request.getStartRange();
     LongRange finish = request.getFinishRange();
+    ApplicationsRequestScope scope = request.getScope();
 
     final Map<ApplicationId, RMApp> apps = rmContext.getRMApps();
     Iterator<RMApp> appsIter;
@@ -482,6 +497,17 @@ public class ClientRMService extends Abs
     List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
     while (appsIter.hasNext() && reports.size() < limit) {
       RMApp application = appsIter.next();
+
+      // Check if current application falls under the specified scope
+      boolean allowAccess = checkAccess(callerUGI, application.getUser(),
+          ApplicationAccessType.VIEW_APP, application);
+      if (scope == ApplicationsRequestScope.OWN &&
+          !callerUGI.getUserName().equals(application.getUser())) {
+        continue;
+      } else if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) {
+        continue;
+      }
+
       if (applicationTypes != null && !applicationTypes.isEmpty()) {
         String appTypeToMatch = caseSensitive
             ? application.getApplicationType()
@@ -511,8 +537,23 @@ public class ClientRMService extends Abs
         continue;
       }
 
-      boolean allowAccess = checkAccess(callerUGI, application.getUser(),
-          ApplicationAccessType.VIEW_APP, application);
+      if (tags != null && !tags.isEmpty()) {
+        Set<String> appTags = application.getApplicationTags();
+        if (appTags == null || appTags.isEmpty()) {
+          continue;
+        }
+        boolean match = false;
+        for (String tag : tags) {
+          if (appTags.contains(tag)) {
+            match = true;
+            break;
+          }
+        }
+        if (!match) {
+          continue;
+        }
+      }
+
       reports.add(application.createAndGetApplicationReport(
           callerUGI.getUserName(), allowAccess));
     }
@@ -686,10 +727,74 @@ public class ClientRMService extends Abs
     }
   }
   
+  @SuppressWarnings("unchecked")
   @Override
   public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
       MoveApplicationAcrossQueuesRequest request) throws YarnException {
-    throw new UnsupportedOperationException("Move not yet supported");
+    ApplicationId applicationId = request.getApplicationId();
+
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie) {
+      LOG.info("Error getting UGI ", ie);
+      RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST,
+          "UNKNOWN", "ClientRMService" , "Error getting UGI",
+          applicationId);
+      throw RPCUtil.getRemoteException(ie);
+    }
+
+    RMApp application = this.rmContext.getRMApps().get(applicationId);
+    if (application == null) {
+      RMAuditLogger.logFailure(callerUGI.getUserName(),
+          AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
+          "Trying to move an absent application", applicationId);
+      throw new ApplicationNotFoundException("Trying to move an absent"
+          + " application " + applicationId);
+    }
+
+    if (!checkAccess(callerUGI, application.getUser(),
+        ApplicationAccessType.MODIFY_APP, application)) {
+      RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+          AuditConstants.MOVE_APP_REQUEST,
+          "User doesn't have permissions to "
+              + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
+          AuditConstants.UNAUTHORIZED_USER, applicationId);
+      throw RPCUtil.getRemoteException(new AccessControlException("User "
+          + callerUGI.getShortUserName() + " cannot perform operation "
+          + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
+    }
+    
+    // Moves only allowed when app is in a state that means it is tracked by
+    // the scheduler
+    if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED,
+        RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED,
+        RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED)
+        .contains(application.getState())) {
+      String msg = "App in " + application.getState() + " state cannot be moved.";
+      RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+          AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg);
+      throw new YarnException(msg);
+    }
+
+    SettableFuture<Object> future = SettableFuture.create();
+    this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppMoveEvent(applicationId, request.getTargetQueue(), future));
+    
+    try {
+      Futures.get(future, YarnException.class);
+    } catch (YarnException ex) {
+      RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+          AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
+          ex.getMessage());
+      throw ex;
+    }
+
+    RMAuditLogger.logSuccess(callerUGI.getShortUserName(), 
+        AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
+    MoveApplicationAcrossQueuesResponse response = recordFactory
+        .newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
+    return response;
   }
 
   private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
@@ -704,7 +809,12 @@ public class ClientRMService extends Abs
 
   void refreshServiceAcls(Configuration configuration, 
       PolicyProvider policyProvider) {
-    this.server.refreshServiceAcl(configuration, policyProvider);
+    if (this.useLocalConfigurationProvider) {
+      this.server.refreshServiceAcl(configuration, policyProvider);
+    } else {
+      this.server.refreshServiceAclWithConfigration(configuration,
+          policyProvider);
+    }
   }
 
   private boolean isAllowedDelegationTokenOp() throws IOException {
@@ -718,4 +828,9 @@ public class ClientRMService extends Abs
       return true;
     }
   }
+
+  @VisibleForTesting
+  public Server getServer() {
+    return this.server;
+  }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Fri Feb  7 01:57:21 2014
@@ -320,7 +320,8 @@ public class RMAppManager implements Eve
             submissionContext.getApplicationName(), user,
             submissionContext.getQueue(),
             submissionContext, this.scheduler, this.masterService,
-            submitTime, submissionContext.getApplicationType());
+            submitTime, submissionContext.getApplicationType(),
+            submissionContext.getApplicationTags());
 
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with different applicationIds will not

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java Fri Feb  7 01:57:21 2014
@@ -45,6 +45,7 @@ public class RMAuditLogger {
 
     public static final String KILL_APP_REQUEST = "Kill Application Request";
     public static final String SUBMIT_APP_REQUEST = "Submit Application Request";
+    public static final String MOVE_APP_REQUEST = "Move Application Request";
     public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
     public static final String FINISH_FAILED_APP = "Application Finished - Failed";
     public static final String FINISH_KILLED_APP = "Application Finished - Killed";

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Fri Feb  7 01:57:21 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -66,6 +67,8 @@ import org.apache.hadoop.yarn.server.uti
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ResourceTrackerService extends AbstractService implements
     ResourceTracker {
 
@@ -92,6 +95,7 @@ public class ResourceTrackerService exte
   
   private int minAllocMb;
   private int minAllocVcores;
+  private boolean useLocalConfigurationProvider;
 
   static {
     resync.setNodeAction(NodeAction.RESYNC);
@@ -141,6 +145,10 @@ public class ResourceTrackerService exte
         YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
         YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
 
+    this.useLocalConfigurationProvider =
+        (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+            YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+            LocalConfigurationProvider.class)));
     super.serviceInit(conf);
   }
 
@@ -415,6 +423,16 @@ public class ResourceTrackerService exte
 
   void refreshServiceAcls(Configuration configuration, 
       PolicyProvider policyProvider) {
-    this.server.refreshServiceAcl(configuration, policyProvider);
+    if (this.useLocalConfigurationProvider) {
+      this.server.refreshServiceAcl(configuration, policyProvider);
+    } else {
+      this.server.refreshServiceAclWithConfigration(configuration,
+          policyProvider);
+    }
+  }
+
+  @VisibleForTesting
+  public Server getServer() {
+    return this.server;
   }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Fri Feb  7 01:57:21 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
 import java.util.Collection;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -194,7 +195,13 @@ public interface RMApp extends EventHand
    * Returns the application type
    * @return the application type.
    */
-  String getApplicationType(); 
+  String getApplicationType();
+
+  /**
+   * Get tags for the application
+   * @return tags corresponding to the application
+   */
+  Set<String> getApplicationTags();
 
   /**
    * Check whether this application is safe to terminate.

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Fri Feb  7 01:57:21 2014
@@ -23,6 +23,7 @@ public enum RMAppEventType {
   START,
   RECOVER,
   KILL,
+  MOVE, // Move app to a new queue
 
   // Source: Scheduler and RMAppManager
   APP_REJECTED,

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Feb  7 01:57:21 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@@ -103,6 +104,7 @@ public class RMAppImpl implements RMApp,
   private final long submitTime;
   private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
   private final String applicationType;
+  private final Set<String> applicationTags;
 
   // Mutable fields
   private long startTime;
@@ -166,6 +168,8 @@ public class RMAppImpl implements RMApp,
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+        RMAppEventType.MOVE, new RMAppMoveTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
         RMAppEventType.APP_REJECTED,
         new FinalSavingTransition(
@@ -181,6 +185,8 @@ public class RMAppImpl implements RMApp,
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+        RMAppEventType.MOVE, new RMAppMoveTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
@@ -190,10 +196,8 @@ public class RMAppImpl implements RMApp,
         // waiting for the previous AM to exit.
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.ACCEPTED))
-    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
+        RMAppEventType.KILL, new KillAttemptTransition())
     // ACCECPTED state can once again receive APP_ACCEPTED event, because on
     // recovery the app returns ACCEPTED state and the app once again go
     // through the scheduler and triggers one more APP_ACCEPTED event at
@@ -204,6 +208,8 @@ public class RMAppImpl implements RMApp,
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+        RMAppEventType.MOVE, new RMAppMoveTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_UNREGISTERED,
         new FinalSavingTransition(
@@ -295,9 +301,9 @@ public class RMAppImpl implements RMApp,
   
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
       Configuration config, String name, String user, String queue,
-      ApplicationSubmissionContext submissionContext,
-      YarnScheduler scheduler,
-      ApplicationMasterService masterService, long submitTime, String applicationType) {
+      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
+      ApplicationMasterService masterService, long submitTime,
+      String applicationType, Set<String> applicationTags) {
 
     this.applicationId = applicationId;
     this.name = name;
@@ -313,6 +319,7 @@ public class RMAppImpl implements RMApp,
     this.submitTime = submitTime;
     this.startTime = System.currentTimeMillis();
     this.applicationType = applicationType;
+    this.applicationTags = applicationTags;
 
     int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -546,7 +553,7 @@ public class RMAppImpl implements RMApp,
           createApplicationState(), diags,
           trackingUrl, this.startTime, this.finishTime, finishState,
           appUsageReport, origTrackingUrl, progress, this.applicationType, 
-          amrmToken);
+          amrmToken, applicationTags);
     } finally {
       this.readLock.unlock();
     }
@@ -692,6 +699,31 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  /**
+   * Move an app to a new queue.
+   * This transition must set the result on the Future in the RMAppMoveEvent,
+   * either as an exception for failure or null for success, or the client will
+   * be left waiting forever.
+   */
+  @SuppressWarnings("unchecked")
+  private static final class RMAppMoveTransition extends RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
+      try {
+        app.queue = app.scheduler.moveApplication(app.applicationId,
+            moveEvent.getTargetQueue());
+      } catch (YarnException ex) {
+        moveEvent.getResult().setException(ex);
+        return;
+      }
+      
+      // TODO: Write out change to state store (YARN-1558)
+      
+      moveEvent.getResult().set(null);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
@@ -1054,6 +1086,11 @@ public class RMAppImpl implements RMApp,
   }
 
   @Override
+  public Set<String> getApplicationTags() {
+    return this.applicationTags;
+  }
+
+  @Override
   public boolean isAppSafeToTerminate() {
     RMAppState state = getState();
     return state.equals(RMAppState.FINISHING)

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Fri Feb  7 01:57:21 2014
@@ -27,11 +27,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
-public class AbstractYarnScheduler {
+public abstract class AbstractYarnScheduler implements ResourceScheduler {
 
   protected RMContext rmContext;
   protected Map<ApplicationId, SchedulerApplication> applications;
@@ -61,4 +62,11 @@ public class AbstractYarnScheduler {
   public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
     return applications;
   }
+  
+  @Override
+  public String moveApplication(ApplicationId appId, String newQueue)
+      throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support moving apps between queues");
+  }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Feb  7 01:57:21 2014
@@ -64,7 +64,7 @@ public class AppSchedulingInfo {
   private Set<String> blacklist = new HashSet<String>();
 
   //private final ApplicationStore store;
-  private final ActiveUsersManager activeUsersManager;
+  private ActiveUsersManager activeUsersManager;
   
   /* Allocated by scheduler */
   boolean pending = true; // for app metrics
@@ -171,11 +171,10 @@ public class AppSchedulingInfo {
             .getNumContainers() : 0;
         Resource lastRequestCapability = lastRequest != null ? lastRequest
             .getCapability() : Resources.none();
-        metrics.incrPendingResources(user, request.getNumContainers()
-            - lastRequestContainers, Resources.subtractFrom( // save a clone
-            Resources.multiply(request.getCapability(), request
-                .getNumContainers()), Resources.multiply(lastRequestCapability,
-                lastRequestContainers)));
+        metrics.incrPendingResources(user, request.getNumContainers(),
+            request.getCapability());
+        metrics.decrPendingResources(user, lastRequestContainers,
+            lastRequestCapability);
       }
     }
   }
@@ -262,9 +261,15 @@ public class AppSchedulingInfo {
       pending = false;
       metrics.runAppAttempt(applicationId, user);
     }
-    LOG.debug("allocate: user: " + user + ", memory: "
-        + request.getCapability());
-    metrics.allocateResources(user, 1, request.getCapability());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("allocate: applicationId=" + applicationId
+          + " container=" + container.getId()
+          + " host=" + container.getNodeId().toString()
+          + " user=" + user
+          + " resource=" + request.getCapability());
+    }
+    metrics.allocateResources(user, 1, request.getCapability(), true);
   }
 
   /**
@@ -277,9 +282,6 @@ public class AppSchedulingInfo {
   synchronized private void allocateNodeLocal( 
       SchedulerNode node, Priority priority, 
       ResourceRequest nodeLocalRequest, Container container) {
-    // Update consumption and track allocations
-    allocate(container);
-
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
@@ -306,10 +308,6 @@ public class AppSchedulingInfo {
   synchronized private void allocateRackLocal(
       SchedulerNode node, Priority priority,
       ResourceRequest rackLocalRequest, Container container) {
-
-    // Update consumption and track allocations
-    allocate(container);
-
     // Update future requirements
     rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
@@ -329,10 +327,6 @@ public class AppSchedulingInfo {
   synchronized private void allocateOffSwitch(
       SchedulerNode node, Priority priority,
       ResourceRequest offSwitchRequest, Container container) {
-
-    // Update consumption and track allocations
-    allocate(container);
-
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
   }
@@ -365,18 +359,24 @@ public class AppSchedulingInfo {
     }
   }
   
-  synchronized private void allocate(Container container) {
-    // Update consumption and track allocations
-    //TODO: fixme sharad
-    /* try {
-        store.storeContainer(container);
-      } catch (IOException ie) {
-        // TODO fix this. we shouldnt ignore
-      }*/
-    
-    LOG.debug("allocate: applicationId=" + applicationId + " container="
-        + container.getId() + " host="
-        + container.getNodeId().toString());
+  synchronized public void move(Queue newQueue) {
+    QueueMetrics oldMetrics = queue.getMetrics();
+    QueueMetrics newMetrics = newQueue.getMetrics();
+    for (Map<String, ResourceRequest> asks : requests.values()) {
+      ResourceRequest request = asks.get(ResourceRequest.ANY);
+      if (request != null) {
+        oldMetrics.decrPendingResources(user, request.getNumContainers(),
+            request.getCapability());
+        newMetrics.incrPendingResources(user, request.getNumContainers(),
+            request.getCapability());
+      }
+    }
+    oldMetrics.moveAppFrom(this);
+    newMetrics.moveAppTo(this);
+    activeUsersManager.deactivateApplication(user, applicationId);
+    activeUsersManager = newQueue.getActiveUsersManager();
+    activeUsersManager.activateApplication(user, applicationId);
+    this.queue = newQueue;
   }
 
   synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
@@ -386,8 +386,7 @@ public class AppSchedulingInfo {
       ResourceRequest request = asks.get(ResourceRequest.ANY);
       if (request != null) {
         metrics.decrPendingResources(user, request.getNumContainers(),
-            Resources.multiply(request.getCapability(), request
-                .getNumContainers()));
+            request.getCapability());
       }
     }
     metrics.finishAppAttempt(applicationId, pending, user);

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Fri Feb  7 01:57:21 2014
@@ -58,4 +58,6 @@ public interface Queue {
   List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
 
   boolean hasAccess(QueueACL acl, UserGroupInformation user);
+  
+  public ActiveUsersManager getActiveUsersManager();
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Fri Feb  7 01:57:21 2014
@@ -280,6 +280,36 @@ public class QueueMetrics implements Met
       parent.finishApp(user, rmAppFinalState);
     }
   }
+  
+  public void moveAppFrom(AppSchedulingInfo app) {
+    if (app.isPending()) {
+      appsPending.decr();
+    } else {
+      appsRunning.decr();
+    }
+    QueueMetrics userMetrics = getUserMetrics(app.getUser());
+    if (userMetrics != null) {
+      userMetrics.moveAppFrom(app);
+    }
+    if (parent != null) {
+      parent.moveAppFrom(app);
+    }
+  }
+  
+  public void moveAppTo(AppSchedulingInfo app) {
+    if (app.isPending()) {
+      appsPending.incr();
+    } else {
+      appsRunning.incr();
+    }
+    QueueMetrics userMetrics = getUserMetrics(app.getUser());
+    if (userMetrics != null) {
+      userMetrics.moveAppTo(app);
+    }
+    if (parent != null) {
+      parent.moveAppTo(app);
+    }
+  }
 
   /**
    * Set available resources. To be called by scheduler periodically as
@@ -324,8 +354,8 @@ public class QueueMetrics implements Met
 
   private void _incrPendingResources(int containers, Resource res) {
     pendingContainers.incr(containers);
-    pendingMB.incr(res.getMemory());
-    pendingVCores.incr(res.getVirtualCores());
+    pendingMB.incr(res.getMemory() * containers);
+    pendingVCores.incr(res.getVirtualCores() * containers);
   }
 
   public void decrPendingResources(String user, int containers, Resource res) {
@@ -341,22 +371,25 @@ public class QueueMetrics implements Met
 
   private void _decrPendingResources(int containers, Resource res) {
     pendingContainers.decr(containers);
-    pendingMB.decr(res.getMemory());
-    pendingVCores.decr(res.getVirtualCores());
+    pendingMB.decr(res.getMemory() * containers);
+    pendingVCores.decr(res.getVirtualCores() * containers);
   }
 
-  public void allocateResources(String user, int containers, Resource res) {
+  public void allocateResources(String user, int containers, Resource res,
+      boolean decrPending) {
     allocatedContainers.incr(containers);
     aggregateContainersAllocated.incr(containers);
     allocatedMB.incr(res.getMemory() * containers);
     allocatedVCores.incr(res.getVirtualCores() * containers);
-    _decrPendingResources(containers, Resources.multiply(res, containers));
+    if (decrPending) {
+      _decrPendingResources(containers, res);
+    }
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.allocateResources(user, containers, res);
+      userMetrics.allocateResources(user, containers, res, decrPending);
     }
     if (parent != null) {
-      parent.allocateResources(user, containers, res);
+      parent.allocateResources(user, containers, res, decrPending);
     }
   }
 

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Feb  7 01:57:21 2014
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.res
 @Unstable
 public class SchedulerApplication {
 
-  private final Queue queue;
+  private Queue queue;
   private final String user;
   private SchedulerApplicationAttempt currentAttempt;
 
@@ -37,6 +37,10 @@ public class SchedulerApplication {
   public Queue getQueue() {
     return queue;
   }
+  
+  public void setQueue(Queue queue) {
+    this.queue = queue;
+  }
 
   public String getUser() {
     return user;

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Feb  7 01:57:21 2014
@@ -57,7 +57,7 @@ import com.google.common.collect.Multise
  */
 @Private
 @Unstable
-public abstract class SchedulerApplicationAttempt {
+public class SchedulerApplicationAttempt {
   
   private static final Log LOG = LogFactory
     .getLog(SchedulerApplicationAttempt.class);
@@ -91,7 +91,7 @@ public abstract class SchedulerApplicati
   protected Map<Priority, Long> lastScheduledContainer =
       new HashMap<Priority, Long>();
 
-  protected final Queue queue;
+  protected Queue queue;
   protected boolean isStopped = false;
   
   protected final RMContext rmContext;
@@ -431,4 +431,25 @@ public abstract class SchedulerApplicati
     this.appSchedulingInfo
       .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
   }
+  
+  public synchronized void move(Queue newQueue) {
+    QueueMetrics oldMetrics = queue.getMetrics();
+    QueueMetrics newMetrics = newQueue.getMetrics();
+    String user = getUser();
+    for (RMContainer liveContainer : liveContainers.values()) {
+      Resource resource = liveContainer.getContainer().getResource();
+      oldMetrics.releaseResources(user, 1, resource);
+      newMetrics.allocateResources(user, 1, resource, false);
+    }
+    for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
+      for (RMContainer reservedContainer : map.values()) {
+        Resource resource = reservedContainer.getReservedResource();
+        oldMetrics.unreserveResource(user, resource);
+        newMetrics.reserveResource(user, resource);
+      }
+    }
+
+    appSchedulingInfo.move(newQueue);
+    this.queue = newQueue;
+  }  
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Feb  7 01:57:21 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 
 /**
@@ -180,4 +182,16 @@ public interface YarnScheduler extends E
   @LimitedPrivate("yarn")
   @Unstable
   public RMContainer getRMContainer(ContainerId containerId);
+  
+  /**
+   * Moves the given application to the given queue
+   * @param appId
+   * @param newQueue
+   * @return the name of the queue the application was placed into
+   * @throws YarnException if the move cannot be carried out
+   */
+  @LimitedPrivate("yarn")
+  @Evolving
+  public String moveApplication(ApplicationId appId, String newQueue)
+      throws YarnException;
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Feb  7 01:57:21 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -195,6 +196,7 @@ public class CapacityScheduler extends A
 
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
+  private boolean useLocalConfigurationProvider;
 
   public CapacityScheduler() {}
 
@@ -261,7 +263,13 @@ public class CapacityScheduler extends A
   public synchronized void
       reinitialize(Configuration conf, RMContext rmContext) throws IOException {
     if (!initialized) {
-      this.conf = new CapacitySchedulerConfiguration(conf);
+      this.useLocalConfigurationProvider =
+          (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+              YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+              LocalConfigurationProvider.class)));
+      this.conf =
+          new CapacitySchedulerConfiguration(conf,
+              this.useLocalConfigurationProvider);
       validateConf(this.conf);
       this.minimumAllocation = this.conf.getMinimumAllocation();
       this.maximumAllocation = this.conf.getMaximumAllocation();
@@ -279,9 +287,10 @@ public class CapacityScheduler extends A
           "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
           "maximumAllocation=<" + getMaximumResourceCapability() + ">");
     } else {
-
       CapacitySchedulerConfiguration oldConf = this.conf; 
-      this.conf = new CapacitySchedulerConfiguration(conf);
+      this.conf =
+          new CapacitySchedulerConfiguration(conf,
+              this.useLocalConfigurationProvider);
       validateConf(this.conf);
       try {
         LOG.info("Re-initializing queues...");

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Fri Feb  7 01:57:21 2014
@@ -140,10 +140,17 @@ public class CapacitySchedulerConfigurat
   }
   
   public CapacitySchedulerConfiguration(Configuration configuration) {
+    this(configuration, true);
+  }
+
+  public CapacitySchedulerConfiguration(Configuration configuration,
+      boolean useLocalConfigurationProvider) {
     super(configuration);
-    addResource(CS_CONFIGURATION_FILE);
+    if (useLocalConfigurationProvider) {
+      addResource(CS_CONFIGURATION_FILE);
+    }
   }
-  
+
   private String getQueuePrefix(String queue) {
     String queueName = PREFIX + queue + DOT;
     return queueName;

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Fri Feb  7 01:57:21 2014
@@ -39,7 +39,8 @@ public class AllocationConfiguration {
   // Minimum resource allocation for each queue
   private final Map<String, Resource> minQueueResources;
   // Maximum amount of resources per queue
-  private final Map<String, Resource> maxQueueResources;
+  @VisibleForTesting
+  final Map<String, Resource> maxQueueResources;
   // Sharing weights for each queue
   private final Map<String, ResourceWeights> queueWeights;
   

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Feb  7 01:57:21 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -54,11 +55,14 @@ public class FSLeafQueue extends FSQueue
   private long lastTimeAtMinShare;
   private long lastTimeAtHalfFairShare;
   
+  private final ActiveUsersManager activeUsersManager;
+  
   public FSLeafQueue(String name, FairScheduler scheduler,
       FSParentQueue parent) {
     super(name, scheduler, parent);
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
     this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
+    activeUsersManager = new ActiveUsersManager(getMetrics());
   }
   
   public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -245,4 +249,9 @@ public class FSLeafQueue extends FSQueue
   public int getNumRunnableApps() {
     return runnableAppScheds.size();
   }
+  
+  @Override
+  public ActiveUsersManager getActiveUsersManager() {
+    return activeUsersManager;
+  }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Feb  7 01:57:21 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 
 @Private
 @Unstable
@@ -194,4 +194,10 @@ public class FSParentQueue extends FSQue
       childQueue.collectSchedulerApplications(apps);
     }
   }
+  
+  @Override
+  public ActiveUsersManager getActiveUsersManager() {
+    // Should never be called since all applications are submitted to LeafQueues
+    return null;
+  }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Feb  7 01:57:21 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@@ -121,8 +121,7 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler implements
-    ResourceScheduler {
+public class FairScheduler extends AbstractYarnScheduler {
   private boolean initialized;
   private FairSchedulerConfiguration conf;
   private Resource minimumAllocation;
@@ -767,7 +766,9 @@ public class FairScheduler extends Abstr
     boolean wasRunnable = queue.removeApp(attempt);
 
     if (wasRunnable) {
-      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
+      maxRunningEnforcer.untrackRunnableApp(attempt);
+      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
+          attempt.getQueue());
     } else {
       maxRunningEnforcer.untrackNonRunnableApp(attempt);
     }
@@ -1356,4 +1357,119 @@ public class FairScheduler extends Abstr
     queue.collectSchedulerApplications(apps);
     return apps;
   }
+
+  @Override
+  public synchronized String moveApplication(ApplicationId appId,
+      String queueName) throws YarnException {
+    SchedulerApplication app = applications.get(appId);
+    if (app == null) {
+      throw new YarnException("App to be moved " + appId + " not found.");
+    }
+    FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt();
+    
+    FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
+    FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false);
+    if (targetQueue == null) {
+      throw new YarnException("Target queue " + queueName
+          + " not found or is not a leaf queue.");
+    }
+    if (targetQueue == oldQueue) {
+      return oldQueue.getQueueName();
+    }
+    
+    if (oldQueue.getRunnableAppSchedulables().contains(
+        attempt.getAppSchedulable())) {
+      verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
+    }
+    
+    executeMove(app, attempt, oldQueue, targetQueue);
+    return targetQueue.getQueueName();
+  }
+  
+  private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
+      FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
+    String queueName = targetQueue.getQueueName();
+    ApplicationAttemptId appAttId = app.getApplicationAttemptId();
+    // When checking maxResources and maxRunningApps, only need to consider
+    // queues before the lowest common ancestor of the two queues because the
+    // total running apps in queues above will not be changed.
+    FSQueue lowestCommonAncestor = findLowestCommonAncestorQueue(oldQueue,
+        targetQueue);
+    Resource consumption = app.getCurrentConsumption();
+    
+    // Check whether the move would go over maxRunningApps or maxShare
+    FSQueue cur = targetQueue;
+    while (cur != lowestCommonAncestor) {
+      // maxRunningApps
+      if (cur.getNumRunnableApps() == allocConf.getQueueMaxApps(cur.getQueueName())) {
+        throw new YarnException("Moving app attempt " + appAttId + " to queue "
+            + queueName + " would violate queue maxRunningApps constraints on"
+            + " queue " + cur.getQueueName());
+      }
+      
+      // maxShare
+      if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption),
+          cur.getMaxShare())) {
+        throw new YarnException("Moving app attempt " + appAttId + " to queue "
+            + queueName + " would violate queue maxShare constraints on"
+            + " queue " + cur.getQueueName());
+      }
+      
+      cur = cur.getParent();
+    }
+  }
+  
+  /**
+   * Helper for moveApplication, which is synchronized, so all operations will
+   * be atomic.
+   */
+  private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
+      FSLeafQueue oldQueue, FSLeafQueue newQueue) {
+    boolean wasRunnable = oldQueue.removeApp(attempt);
+    // if app was not runnable before, it may be runnable now
+    boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
+        attempt.getUser());
+    if (wasRunnable && !nowRunnable) {
+      throw new IllegalStateException("Should have already verified that app "
+          + attempt.getApplicationId() + " would be runnable in new queue");
+    }
+    
+    if (wasRunnable) {
+      maxRunningEnforcer.untrackRunnableApp(attempt);
+    } else if (nowRunnable) {
+      // App has changed from non-runnable to runnable
+      maxRunningEnforcer.untrackNonRunnableApp(attempt);
+    }
+    
+    attempt.move(newQueue); // This updates all the metrics
+    app.setQueue(newQueue);
+    newQueue.addApp(attempt, nowRunnable);
+    
+    if (nowRunnable) {
+      maxRunningEnforcer.trackRunnableApp(attempt);
+    }
+    if (wasRunnable) {
+      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue);
+    }
+  }
+  
+  private FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
+    // Because queue names include ancestors, separated by periods, we can find
+    // the lowest common ancestors by going from the start of the names until
+    // there's a character that doesn't match.
+    String name1 = queue1.getName();
+    String name2 = queue2.getName();
+    // We keep track of the last period we encounter to avoid returning root.apple
+    // when the queues are root.applepie and root.appletart
+    int lastPeriodIndex = -1;
+    for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) {
+      if (name1.length() <= i || name2.length() <= i ||
+          name1.charAt(i) != name2.charAt(i)) {
+        return queueMgr.getQueue(name1.substring(lastPeriodIndex));
+      } else if (name1.charAt(i) == '.') {
+        lastPeriodIndex = i;
+      }
+    }
+    return queue1; // names are identical
+  }
 }

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Fri Feb  7 01:57:21 2014
@@ -105,26 +105,15 @@ public class MaxRunningAppsEnforcer {
   }
 
   /**
-   * Updates the relevant tracking variables after a runnable app with the given
-   * queue and user has been removed. Checks to see whether any other applications
-   * are now runnable and makes them so.
+   * Checks to see whether any other applications runnable now that the given
+   * application has been removed from the given queue.  And makes them so.
    * 
    * Runs in O(n log(n)) where n is the number of queues that are under the
    * highest queue that went from having no slack to having slack.
    */
-  public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
+  public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) {
     AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     
-    // Update usersRunnableApps
-    String user = app.getUser();
-    int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
-    if (newUserNumRunning == 0) {
-      usersNumRunnableApps.remove(user);
-    } else {
-      usersNumRunnableApps.put(user, newUserNumRunning);
-    }
-
-    // Update runnable app bookkeeping for queues:
     // childqueueX might have no pending apps itself, but if a queue higher up
     // in the hierarchy parentqueueY has a maxRunningApps set, an app completion
     // in childqueueX could allow an app in some other distant child of
@@ -133,16 +122,14 @@ public class MaxRunningAppsEnforcer {
     // the queue was already at its max before the removal.
     // Thus we find the ancestor queue highest in the tree for which the app
     // that was at its maxRunningApps before the removal.
-    FSLeafQueue queue = app.getQueue();
     FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
         allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
     FSParentQueue parent = queue.getParent();
     while (parent != null) {
       if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent
-          .getName())) {
+          .getName()) - 1) {
         highestQueueWithAppsNowRunnable = parent;
       }
-      parent.decrementRunnableApps();
       parent = parent.getParent();
     }
 
@@ -157,7 +144,12 @@ public class MaxRunningAppsEnforcer {
       gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
           appsNowMaybeRunnable);
     }
-    if (newUserNumRunning == allocConf.getUserMaxApps(user) - 1) {
+    String user = app.getUser();
+    Integer userNumRunning = usersNumRunnableApps.get(user);
+    if (userNumRunning == null) {
+      userNumRunning = 0;
+    }
+    if (userNumRunning == allocConf.getUserMaxApps(user) - 1) {
       List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
       if (userWaitingApps != null) {
         appsNowMaybeRunnable.add(userWaitingApps);
@@ -209,6 +201,29 @@ public class MaxRunningAppsEnforcer {
   }
   
   /**
+   * Updates the relevant tracking variables after a runnable app with the given
+   * queue and user has been removed.
+   */
+  public void untrackRunnableApp(FSSchedulerApp app) {
+    // Update usersRunnableApps
+    String user = app.getUser();
+    int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
+    if (newUserNumRunning == 0) {
+      usersNumRunnableApps.remove(user);
+    } else {
+      usersNumRunnableApps.put(user, newUserNumRunning);
+    }
+    
+    // Update runnable app bookkeeping for queues
+    FSLeafQueue queue = app.getQueue();
+    FSParentQueue parent = queue.getParent();
+    while (parent != null) {
+      parent.decrementRunnableApps();
+      parent = parent.getParent();
+    }
+  }
+  
+  /**
    * Stops tracking the given non-runnable app
    */
   public void untrackNonRunnableApp(FSSchedulerApp app) {

Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Feb  7 01:57:21 2014
@@ -77,7 +77,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -106,7 +105,7 @@ import com.google.common.annotations.Vis
 @Evolving
 @SuppressWarnings("unchecked")
 public class FifoScheduler extends AbstractYarnScheduler implements
-    ResourceScheduler, Configurable {
+    Configurable {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
 
@@ -184,6 +183,11 @@ public class FifoScheduler extends Abstr
     public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
       return getQueueAcls().get(acl).isUserAllowed(user);
     }
+    
+    @Override
+    public ActiveUsersManager getActiveUsersManager() {
+      return activeUsersManager;
+    }
   };
 
   @Override



Mime
View raw message