Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A4A89100D4 for ; Thu, 17 Oct 2013 05:34:57 +0000 (UTC) Received: (qmail 97850 invoked by uid 500); 17 Oct 2013 05:34:42 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 97829 invoked by uid 500); 17 Oct 2013 05:34:41 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 97628 invoked by uid 99); 17 Oct 2013 05:34:32 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 05:34:32 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_OPPORTUN1 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 05:34:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 54B762388C9E; Thu, 17 Oct 2013 05:33:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1532967 [5/9] - in /hadoop/common/branches/HDFS-4949/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java... Date: Thu, 17 Oct 2013 05:33:06 -0000 To: yarn-commits@hadoop.apache.org From: wang@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131017053314.54B762388C9E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Oct 17 05:32:42 2013 @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.http.HttpConfig.Policy; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.SecurityUtil; @@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; @@ -88,6 +91,7 @@ import org.apache.hadoop.yarn.server.web import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -105,7 +109,14 @@ public class ResourceManager extends Com public static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final Log LOG = LogFactory.getLog(ResourceManager.class); - public static final long clusterTimeStamp = System.currentTimeMillis(); + private static long clusterTimeStamp = System.currentTimeMillis(); + + /** + * "Always On" services. Services that need to run always irrespective of + * the HA state of the RM. + */ + @VisibleForTesting + protected RMHAProtocolService haService; /** * "Active" services. Services that need to run only on the Active RM. @@ -137,6 +148,7 @@ public class ResourceManager extends Com private EventHandler schedulerDispatcher; protected RMAppManager rmAppManager; protected ApplicationACLsManager applicationACLsManager; + protected QueueACLsManager queueACLsManager; protected RMDelegationTokenSecretManager rmDTSecretManager; private DelegationTokenRenewer delegationTokenRenewer; private WebApp webApp; @@ -155,17 +167,31 @@ public class ResourceManager extends Com public RMContext getRMContext() { return this.rmContext; } - + + public static long getClusterTimeStamp() { + return clusterTimeStamp; + } + + @VisibleForTesting + protected static void setClusterTimeStamp(long timestamp) { + clusterTimeStamp = timestamp; + } + @Override protected void serviceInit(Configuration conf) throws Exception { validateConfigs(conf); this.conf = conf; - activeServices = new RMActiveServices(); - addService(activeServices); + haService = new RMHAProtocolService(this); + addService(haService); super.serviceInit(conf); } + protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, + Configuration conf) { + return new QueueACLsManager(scheduler, conf); + } + @VisibleForTesting protected void setRMStateStore(RMStateStore rmStore) { rmStore.setRMDispatcher(rmDispatcher); @@ -372,6 +398,8 @@ public class ResourceManager extends Com applicationACLsManager = new ApplicationACLsManager(conf); + queueACLsManager = createQueueACLsManager(scheduler, conf); + rmAppManager = createRMAppManager(); // Register event handler for RMAppManagerEvents rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); @@ -430,12 +458,8 @@ public class ResourceManager extends Com } if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { - String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS); - hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname; int port = webApp.port(); - String resolvedAddress = hostname + ":" + port; - conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress); + WebAppUtils.setRMWebAppPort(conf, port); } super.serviceStart(); @@ -470,6 +494,7 @@ public class ResourceManager extends Com LOG.error("Error closing store.", e); } } + super.serviceStop(); } } @@ -692,10 +717,9 @@ public class ResourceManager extends Com YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) .withHttpSpnegoKeytabKey( YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) - .at(this.conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS)); - String proxyHostAndPort = YarnConfiguration.getProxyHostAndPort(conf); - if(YarnConfiguration.getRMWebAppHostAndPort(conf). + .at(WebAppUtils.getRMWebAppURLWithoutScheme(conf)); + String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf); + if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). equals(proxyHostAndPort)) { AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService()); builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, @@ -708,6 +732,47 @@ public class ResourceManager extends Com webApp = builder.start(new RMWebApp(this)); } + void setConf(Configuration configuration) { + conf = configuration; + } + + /** + * Helper method to create and init {@link #activeServices}. This creates an + * instance of {@link RMActiveServices} and initializes it. + * @throws Exception + */ + void createAndInitActiveServices() throws Exception { + activeServices = new RMActiveServices(); + activeServices.init(conf); + } + + /** + * Helper method to start {@link #activeServices}. + * @throws Exception + */ + void startActiveServices() throws Exception { + if (activeServices != null) { + clusterTimeStamp = System.currentTimeMillis(); + activeServices.start(); + } + } + + /** + * Helper method to stop {@link #activeServices}. + * @throws Exception + */ + void stopActiveServices() throws Exception { + if (activeServices != null) { + activeServices.stop(); + activeServices = null; + } + } + + @VisibleForTesting + protected boolean areActiveServicesRunning() { + return activeServices != null && activeServices.isInState(STATE.STARTED); + } + @Override protected void serviceStart() throws Exception { try { @@ -715,7 +780,6 @@ public class ResourceManager extends Com } catch(IOException ie) { throw new YarnRuntimeException("Failed to login", ie); } - super.serviceStart(); } @@ -753,7 +817,8 @@ public class ResourceManager extends Com protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, - this.applicationACLsManager, this.rmDTSecretManager); + this.applicationACLsManager, this.queueACLsManager, + this.rmDTSecretManager); } protected ApplicationMasterService createApplicationMasterService() { @@ -834,6 +899,11 @@ public class ResourceManager extends Com } @Private + public QueueACLsManager getQueueACLsManager() { + return this.queueACLsManager; + } + + @Private public RMContainerTokenSecretManager getRMContainerTokenSecretManager() { return this.containerTokenSecretManager; } @@ -856,7 +926,7 @@ public class ResourceManager extends Com // recover applications rmAppManager.recover(state); } - + public static void main(String argv[]) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); @@ -866,6 +936,7 @@ public class ResourceManager extends Com ShutdownHookManager.get().addShutdownHook( new CompositeServiceShutdownHook(resourceManager), SHUTDOWN_HOOK_PRIORITY); + setHttpPolicy(conf); resourceManager.init(conf); resourceManager.start(); } catch (Throwable t) { @@ -873,4 +944,10 @@ public class ResourceManager extends Com System.exit(-1); } } + + private static void setHttpPolicy(Configuration conf) { + HttpConfig.setPolicy(Policy.fromString(conf.get( + YarnConfiguration.YARN_HTTP_POLICY_KEY, + YarnConfiguration.YARN_HTTP_POLICY_DEFAULT))); + } } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Thu Oct 17 05:32:42 2013 @@ -28,6 +28,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.hadoop.yarn.util.YarnVersionInfo; public class ResourceTrackerService extends AbstractService implements ResourceTracker { @@ -73,6 +75,7 @@ public class ResourceTrackerService exte private long nextHeartBeatInterval; private Server server; private InetSocketAddress resourceTrackerAddress; + private String minimumNodeManagerVersion; private static final NodeHeartbeatResponse resync = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); @@ -99,6 +102,7 @@ public class ResourceTrackerService exte this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; + } @Override @@ -124,7 +128,11 @@ public class ResourceTrackerService exte minAllocVcores = conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - + + minimumNodeManagerVersion = conf.get( + YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, + YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); + super.serviceInit(conf); } @@ -172,10 +180,30 @@ public class ResourceTrackerService exte int cmPort = nodeId.getPort(); int httpPort = request.getHttpPort(); Resource capability = request.getResource(); + String nodeManagerVersion = request.getNMVersion(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); + if (!minimumNodeManagerVersion.equals("NONE")) { + if (minimumNodeManagerVersion.equals("EqualToRM")) { + minimumNodeManagerVersion = YarnVersionInfo.getVersion(); + } + + if ((nodeManagerVersion == null) || + (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) { + String message = + "Disallowed NodeManager Version " + nodeManagerVersion + + ", is less than the minimum version " + + minimumNodeManagerVersion + " sending SHUTDOWN signal to " + + "NodeManager."; + LOG.info(message); + response.setDiagnosticsMessage(message); + response.setNodeAction(NodeAction.SHUTDOWN); + return response; + } + } + // Check if this node is a 'valid' node if (!this.nodesListManager.isValidNode(host)) { String message = @@ -206,7 +234,7 @@ public class ResourceTrackerService exte .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability); + resolve(host), capability, nodeManagerVersion); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { @@ -229,7 +257,8 @@ public class ResourceTrackerService exte + ", assigned nodeId " + nodeId; LOG.info(message); response.setNodeAction(NodeAction.NORMAL); - response.setRMIdentifier(ResourceManager.clusterTimeStamp); + response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); + response.setRMVersion(YarnVersionInfo.getVersion()); return response; } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Thu Oct 17 05:32:42 2013 @@ -63,12 +63,6 @@ public class FileSystemRMStateStore exte public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); private static final String ROOT_DIR_NAME = "FSRMStateRoot"; - private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; - private static final String RM_APP_ROOT = "RMAppRoot"; - private static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; - private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; - private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = - "RMDTSequenceNumber_"; protected FileSystem fs; Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Thu Oct 17 05:32:42 2013 @@ -30,6 +30,7 @@ import org.apache.hadoop.security.Creden import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -108,7 +109,9 @@ public class MemoryRMStateStore extends ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); - assert appState != null; + if (appState == null) { + throw new YarnRuntimeException("Application doesn't exist"); + } if (appState.attempts.containsKey(attemptState.getAttemptId())) { Exception e = new IOException("Attempt: " + @@ -125,7 +128,9 @@ public class MemoryRMStateStore extends throws Exception { ApplicationId appId = appState.getAppId(); ApplicationState removed = state.appState.remove(appId); - assert removed != null; + if (removed == null) { + throw new YarnRuntimeException("Removing non-exsisting application state"); + } } @Override Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Thu Oct 17 05:32:42 2013 @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; @@ -64,6 +65,14 @@ import org.apache.hadoop.yarn.server.res */ public abstract class RMStateStore extends AbstractService { + // constants for RM App state and RMDTSecretManagerState. + protected static final String RM_APP_ROOT = "RMAppRoot"; + protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; + protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; + protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = + "RMDTSequenceNumber_"; + public static final Log LOG = LogFactory.getLog(RMStateStore.class); public RMStateStore() { @@ -463,8 +472,9 @@ public abstract class RMStateStore exten (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl .newApplicationAttemptStateData(attemptState.getAttemptId(), attemptState.getMasterContainer(), appAttemptTokens); - - LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); + } storeApplicationAttemptState(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { @@ -482,12 +492,15 @@ public abstract class RMStateStore exten ApplicationState appState = ((RMStateStoreRemoveAppEvent) event).getAppState(); ApplicationId appId = appState.getAppId(); - + Exception removedException = null; LOG.info("Removing info for app: " + appId); try { removeApplicationState(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); + removedException = e; + } finally { + notifyDoneRemovingApplcation(appId, removedException); } } break; @@ -521,7 +534,18 @@ public abstract class RMStateStore exten rmDispatcher.getEventHandler().handle( new RMAppAttemptStoredEvent(attemptId, storedException)); } - + + @SuppressWarnings("unchecked") + /** + * This is to notify RMApp that this application has been removed from + * RMStateStore + */ + private void notifyDoneRemovingApplcation(ApplicationId appId, + Exception removedException) { + rmDispatcher.getEventHandler().handle( + new RMAppRemovedEvent(appId, removedException)); + } + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Thu Oct 17 05:32:42 2013 @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -194,4 +195,20 @@ public interface RMApp extends EventHand * @return the application type. */ String getApplicationType(); + + /** + * Check whether this application is safe to unregister. + * An application is deemed to be safe to unregister if it is an unmanaged + * AM or its state has been removed from state store. + * @return the flag which indicates whether this application is safe to + * unregister. + */ + boolean isAppSafeToUnregister(); + + /** + * Create the external user-facing state of ApplicationMaster from the + * current state of the {@link RMApp}. + * @return the external user-facing state of ApplicationMaster. + */ + YarnApplicationState createApplicationState(); } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Thu Oct 17 05:32:42 2013 @@ -27,11 +27,14 @@ public enum RMAppEventType { // Source: RMAppAttempt APP_REJECTED, APP_ACCEPTED, - APP_SAVED, ATTEMPT_REGISTERED, - ATTEMPT_FINISHING, + ATTEMPT_UNREGISTERED, ATTEMPT_FINISHED, // Will send the final state ATTEMPT_FAILED, ATTEMPT_KILLED, - NODE_UPDATE + NODE_UPDATE, + + // Source: RMStateStore + APP_SAVED, + APP_REMOVED } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Thu Oct 17 05:32:42 2013 @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -109,6 +111,8 @@ public class RMAppImpl implements RMApp, private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); + private boolean isAppRemovalRequestSent = false; + private RMAppState previousStateAtRemoving; private static final StateMachineFactory getAMRMToken(); /** - * The master key for client-to-AM tokens for this app attempt + * The master key for client-to-AM tokens for this app attempt. This is only + * used for RMStateStore. Normal operation must invoke the secret manager to + * get the key and not use the local key directly. * @return The master key for client-to-AM tokens for this app attempt */ + @LimitedPrivate("RMStateStore") SecretKey getClientTokenMasterKey(); /** Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Thu Oct 17 05:32:42 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import static org.apache.hadoop.yarn.util.StringHelper.pjoin; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -99,6 +99,7 @@ import org.apache.hadoop.yarn.state.Sing import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @SuppressWarnings({"unchecked", "rawtypes"}) public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @@ -380,7 +381,7 @@ public class RMAppAttemptImpl implements this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); - this.proxiedTrackingUrl = generateProxyUriWithoutScheme(); + this.proxiedTrackingUrl = generateProxyUriWithScheme(null); this.stateMachine = stateMachineFactory.make(this); this.user = user; @@ -469,22 +470,17 @@ public class RMAppAttemptImpl implements } } - private String generateProxyUriWithoutScheme() { - return generateProxyUriWithoutScheme(null); - } - - private String generateProxyUriWithoutScheme( + private String generateProxyUriWithScheme( final String trackingUriWithoutScheme) { this.readLock.lock(); try { URI trackingUri = StringUtils.isEmpty(trackingUriWithoutScheme) ? null : ProxyUriUtils.getUriFromAMUrl(trackingUriWithoutScheme); - String proxy = YarnConfiguration.getProxyHostAndPort(conf); + String proxy = WebAppUtils.getProxyHostAndPort(conf); URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy); URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri, applicationAttemptId.getApplicationId()); - //We need to strip off the scheme to have it match what was there before - return result.toASCIIString().substring(HttpConfig.getSchemePrefix().length()); + return result.toASCIIString(); } catch (URISyntaxException e) { LOG.warn("Could not proxify "+trackingUriWithoutScheme,e); return trackingUriWithoutScheme; @@ -495,11 +491,13 @@ public class RMAppAttemptImpl implements private void setTrackingUrlToRMAppPage() { origTrackingUrl = pjoin( - YarnConfiguration.getRMWebAppHostAndPort(conf), + WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf), "cluster", "app", getAppAttemptId().getApplicationId()); proxiedTrackingUrl = origTrackingUrl; } + // This is only used for RMStateStore. Normal operation must invoke the secret + // manager to get the key and not use the local key directly. @Override public SecretKey getClientTokenMasterKey() { return this.clientTokenMasterKey; @@ -675,7 +673,7 @@ public class RMAppAttemptImpl implements } @Override - public void recover(RMState state) { + public void recover(RMState state) throws Exception{ ApplicationState appState = state.getApplicationState().get(getAppAttemptId().getApplicationId()); ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); @@ -690,7 +688,8 @@ public class RMAppAttemptImpl implements RMAppAttemptEventType.RECOVER)); } - private void recoverAppAttemptCredentials(Credentials appAttemptTokens) { + private void recoverAppAttemptCredentials(Credentials appAttemptTokens) + throws IOException { if (appAttemptTokens == null) { return; } @@ -707,11 +706,7 @@ public class RMAppAttemptImpl implements this.amrmToken = (Token) appAttemptTokens .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); - - // For now, no need to populate tokens back to AMRMTokenSecretManager, - // because running attempts are rebooted. Later in work-preserve restart, - // we'll create NEW->RUNNING transition in which the restored tokens will be - // added to the secret manager + rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken); } private static class BaseTransition implements @@ -736,9 +731,9 @@ public class RMAppAttemptImpl implements .registerAppAttempt(appAttempt.applicationAttemptId); if (UserGroupInformation.isSecurityEnabled()) { - appAttempt.clientTokenMasterKey = appAttempt.rmContext - .getClientToAMTokenSecretManager() - .registerApplication(appAttempt.applicationAttemptId); + appAttempt.clientTokenMasterKey = + appAttempt.rmContext.getClientToAMTokenSecretManager() + .createMasterKey(appAttempt.applicationAttemptId); } // create AMRMToken @@ -924,6 +919,12 @@ public class RMAppAttemptImpl implements RMAppAttemptEvent event) { // Register with AMLivelinessMonitor appAttempt.attemptLaunched(); + + // register the ClientTokenMasterKey after it is saved in the store, + // otherwise client may hold an invalid ClientToken after RM restarts. + appAttempt.rmContext.getClientToAMTokenSecretManager() + .registerApplication(appAttempt.getAppAttemptId(), + appAttempt.getClientTokenMasterKey()); } } @@ -988,7 +989,7 @@ public class RMAppAttemptImpl implements } } - static final class AMRegisteredTransition extends BaseTransition { + private static final class AMRegisteredTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { @@ -997,9 +998,10 @@ public class RMAppAttemptImpl implements = (RMAppAttemptRegistrationEvent) event; appAttempt.host = registrationEvent.getHost(); appAttempt.rpcPort = registrationEvent.getRpcport(); - appAttempt.origTrackingUrl = registrationEvent.getTrackingurl(); + appAttempt.origTrackingUrl = + sanitizeTrackingUrl(registrationEvent.getTrackingurl()); appAttempt.proxiedTrackingUrl = - appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl); + appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl); // Let the app know appAttempt.eventHandler.handle(new RMAppEvent(appAttempt @@ -1132,9 +1134,10 @@ public class RMAppAttemptImpl implements RMAppAttemptUnregistrationEvent unregisterEvent = (RMAppAttemptUnregistrationEvent) event; appAttempt.diagnostics.append(unregisterEvent.getDiagnostics()); - appAttempt.origTrackingUrl = unregisterEvent.getTrackingUrl(); + appAttempt.origTrackingUrl = + sanitizeTrackingUrl(unregisterEvent.getTrackingUrl()); appAttempt.proxiedTrackingUrl = - appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl); + appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl); appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus(); // Tell the app @@ -1149,7 +1152,7 @@ public class RMAppAttemptImpl implements ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId(); appAttempt.eventHandler.handle( - new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING)); + new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED)); return RMAppAttemptState.FINISHING; } } @@ -1286,4 +1289,8 @@ public class RMAppAttemptImpl implements appAttempt.rmContext.getAMRMTokenSecretManager() .applicationMasterFinished(appAttempt.getAppAttemptId()); } + + private static String sanitizeTrackingUrl(String url) { + return (url == null || url.trim().isEmpty()) ? "N/A" : url; + } } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Thu Oct 17 05:32:42 2013 @@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; /** * Node managers information on available resources @@ -84,7 +83,13 @@ public interface RMNode { * @return the time of the latest health report received from this node. */ public long getLastHealthReportTime(); - + + /** + * the node manager version of the node received as part of the + * registration with the resource manager + */ + public String getNodeManagerVersion(); + /** * the total available resource. * @return the total available resource. Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Thu Oct 17 05:32:42 2013 @@ -97,6 +97,7 @@ public class RMNodeImpl implements RMNod private String healthReport; private long lastHealthReportTime; + private String nodeManagerVersion; /* set of containers that have just launched */ private final Map justLaunchedContainers = @@ -172,7 +173,7 @@ public class RMNodeImpl implements RMNod RMNodeEvent> stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, - int cmPort, int httpPort, Node node, Resource capability) { + int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -184,6 +185,7 @@ public class RMNodeImpl implements RMNod this.node = node; this.healthReport = "Healthy"; this.lastHealthReportTime = System.currentTimeMillis(); + this.nodeManagerVersion = nodeManagerVersion; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -289,6 +291,11 @@ public class RMNodeImpl implements RMNod } @Override + public String getNodeManagerVersion() { + return nodeManagerVersion; + } + + @Override public NodeState getState() { this.readLock.lock(); @@ -460,8 +467,11 @@ public class RMNodeImpl implements RMNod && rmNode.getHttpPort() == newNode.getHttpPort()) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); + if (rmNode.getState() != NodeState.UNHEALTHY) { + // Only add new node if old state is not UNHEALTHY + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeAddedSchedulerEvent(rmNode)); + } } else { // Reconnected node differs, so replace old node and start new node switch (rmNode.getState()) { Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Thu Oct 17 05:32:42 2013 @@ -116,14 +116,11 @@ public class AppSchedulingInfo { * The ApplicationMaster is updating resource requirements for the * application, by asking for more resources and releasing resources acquired * by the application. - * + * * @param requests resources to be acquired - * @param blacklistAdditions resources to be added to the blacklist - * @param blacklistRemovals resources to be removed from the blacklist */ synchronized public void updateResourceRequests( - List requests, - List blacklistAdditions, List blacklistRemovals) { + List requests) { QueueMetrics metrics = queue.getMetrics(); // Update resource requests @@ -181,11 +178,16 @@ public class AppSchedulingInfo { lastRequestContainers))); } } + } - // - // Update blacklist - // - + /** + * The ApplicationMaster is updating the blacklist + * + * @param blacklistAdditions resources to be added to the blacklist + * @param blacklistRemovals resources to be removed from the blacklist + */ + synchronized public void updateBlacklist( + List blacklistAdditions, List blacklistRemovals) { // Add to blacklist if (blacklistAdditions != null) { blacklist.addAll(blacklistAdditions); Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Thu Oct 17 05:32:42 2013 @@ -64,4 +64,6 @@ public interface Queue { * @return queue ACLs for user */ List getQueueUserAclInfo(UserGroupInformation user); + + boolean hasAccess(QueueACL acl, UserGroupInformation user); } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Thu Oct 17 05:32:42 2013 @@ -73,7 +73,7 @@ public class QueueMetrics implements Met @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores; @Metric("# of reserved containers") MutableGaugeInt reservedContainers; @Metric("# of active users") MutableGaugeInt activeUsers; - @Metric("# of active users") MutableGaugeInt activeApplications; + @Metric("# of active applications") MutableGaugeInt activeApplications; private final MutableGaugeInt[] runningTime; private TimeBucketMetrics runBuckets; Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Oct 17 05:32:42 2013 @@ -25,9 +25,11 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -134,4 +136,17 @@ public interface YarnScheduler extends E @LimitedPrivate("yarn") @Evolving QueueMetrics getRootQueueMetrics(); + + /** + * Check if the user has permission to perform the operation. + * If the user has {@link QueueACL#ADMINISTER_QUEUE} permission, + * this user can view/modify the applications in this queue + * @param callerUGI + * @param acl + * @param queueName + * @return true if the user has the permission, + * false otherwise + */ + boolean checkAccess(UserGroupInformation callerUGI, + QueueACL acl, String queueName); } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Thu Oct 17 05:32:42 2013 @@ -99,15 +99,11 @@ class CSQueueUtils { Resources.divide(calculator, clusterResource, usedResources, queueLimit); } - + childQueue.setUsedCapacity(usedCapacity); childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity); - Resource available = - Resources.roundUp( - calculator, - Resources.subtract(queueLimit, usedResources), - minimumAllocation); + Resource available = Resources.subtract(queueLimit, usedResources); childQueue.getMetrics().setAvailableResourcesToQueue( Resources.max( calculator, Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Oct 17 05:32:42 2013 @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -571,8 +572,7 @@ public class CapacityScheduler application.showRequests(); // Update application requests - application.updateResourceRequests(ask, - blacklistAdditions, blacklistRemovals); + application.updateResourceRequests(ask); LOG.debug("allocate: post-update"); application.showRequests(); @@ -584,6 +584,8 @@ public class CapacityScheduler " #ask=" + ask.size()); } + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + return application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability()); } @@ -913,4 +915,18 @@ public class CapacityScheduler RMContainerEventType.KILL); } + @Override + public synchronized boolean checkAccess(UserGroupInformation callerUGI, + QueueACL acl, String queueName) { + CSQueue queue = getQueue(queueName); + if (queue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("ACL not found for queue access-type " + acl + + " for queue " + queueName); + } + return false; + } + return queue.hasAccess(acl, callerUGI); + } + } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Oct 17 05:32:42 2013 @@ -644,7 +644,8 @@ public class LeafQueue implements CSQueu // Check queue ACLs UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(userName); - if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) { + if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) + && !hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { throw new AccessControlException("User " + userName + " cannot submit" + " applications to queue " + getQueuePath()); } @@ -1609,9 +1610,12 @@ public class LeafQueue implements CSQueu } - // need to access the list of apps from the preemption monitor + /** + * Obtain (read-only) collection of active applications. + */ public Set getApplications() { - return Collections.unmodifiableSet(activeApplications); + // need to access the list of apps from the preemption monitor + return activeApplications; } // return a single Resource capturing the overal amount of pending resources Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Thu Oct 17 05:32:42 2013 @@ -141,10 +141,16 @@ public class FiCaSchedulerApp extends Sc } public synchronized void updateResourceRequests( - List requests, + List requests) { + if (!isStopped) { + this.appSchedulingInfo.updateResourceRequests(requests); + } + } + + public synchronized void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { if (!isStopped) { - this.appSchedulingInfo.updateResourceRequests(requests, + this.appSchedulingInfo.updateBlacklist( blacklistAdditions, blacklistRemovals); } } Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Thu Oct 17 05:32:42 2013 @@ -192,10 +192,6 @@ public class AppSchedulable extends Sche RMContainer rmContainer = app.reserve(node, priority, null, container); node.reserveResource(app, priority, rmContainer); - getMetrics().reserveResource(app.getUser(), - container.getResource()); - scheduler.getRootQueueMetrics().reserveResource(app.getUser(), - container.getResource()); } else { @@ -216,8 +212,6 @@ public class AppSchedulable extends Sche node.unreserveResource(app); getMetrics().unreserveResource( app.getUser(), rmContainer.getContainer().getResource()); - scheduler.getRootQueueMetrics().unreserveResource( - app.getUser(), rmContainer.getContainer().getResource()); } /** @@ -274,7 +268,9 @@ public class AppSchedulable extends Sche } private Resource assignContainer(FSSchedulerNode node, boolean reserved) { - LOG.info("Node offered to app: " + getName() + " reserved: " + reserved); + if (LOG.isDebugEnabled()) { + LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); + } if (reserved) { RMContainer rmContainer = node.getReservedContainer(); @@ -316,10 +312,19 @@ public class AppSchedulable extends Sche + localRequest); } - NodeType allowedLocality = app.getAllowedLocalityLevel(priority, - scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(), - scheduler.getRackLocalityThreshold()); - + NodeType allowedLocality; + if (scheduler.isContinuousSchedulingEnabled()) { + allowedLocality = app.getAllowedLocalityLevelByTime(priority, + scheduler.getNodeLocalityDelayMs(), + scheduler.getRackLocalityDelayMs(), + scheduler.getClock().getTime()); + } else { + allowedLocality = app.getAllowedLocalityLevel(priority, + scheduler.getNumClusterNodes(), + scheduler.getNodeLocalityThreshold(), + scheduler.getRackLocalityThreshold()); + } + if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && localRequest != null && localRequest.getNumContainers() != 0) { return assignContainer(node, priority, Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Thu Oct 17 05:32:42 2013 @@ -146,7 +146,7 @@ public class FSLeafQueue extends FSQueue public Resource assignContainer(FSSchedulerNode node) { Resource assigned = Resources.none(); if (LOG.isDebugEnabled()) { - LOG.debug("Node offered to queue: " + getName()); + LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName()); } if (!assignContainerPreCheck(node)) { Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java Thu Oct 17 05:32:42 2013 @@ -21,12 +21,14 @@ package org.apache.hadoop.yarn.server.re import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +@Metrics(context="yarn") public class FSQueueMetrics extends QueueMetrics { @Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB; Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu Oct 17 05:32:42 2013 @@ -138,7 +138,7 @@ public class FSSchedulerApp extends Sche public synchronized void updateResourceRequests( List requests) { - this.appSchedulingInfo.updateResourceRequests(requests, null, null); + this.appSchedulingInfo.updateResourceRequests(requests); } public Map getResourceRequests(Priority priority) { @@ -464,7 +464,12 @@ public class FSSchedulerApp extends Sche * @param priority The priority of the container scheduled. */ synchronized public void resetSchedulingOpportunities(Priority priority) { - lastScheduledContainer.put(priority, System.currentTimeMillis()); + resetSchedulingOpportunities(priority, System.currentTimeMillis()); + } + // used for continuous scheduling + synchronized public void resetSchedulingOpportunities(Priority priority, + long currentTimeMs) { + lastScheduledContainer.put(priority, currentTimeMs); schedulingOpportunities.setCount(priority, 0); } @@ -513,6 +518,55 @@ public class FSSchedulerApp extends Sche return allowedLocalityLevel.get(priority); } + /** + * Return the level at which we are allowed to schedule containers. + * Given the thresholds indicating how much time passed before relaxing + * scheduling constraints. + */ + public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, + long nodeLocalityDelayMs, long rackLocalityDelayMs, + long currentTimeMs) { + + // if not being used, can schedule anywhere + if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { + return NodeType.OFF_SWITCH; + } + + // default level is NODE_LOCAL + if (! allowedLocalityLevel.containsKey(priority)) { + allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } + + NodeType allowed = allowedLocalityLevel.get(priority); + + // if level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; + } + + // check waiting time + long waitTime = currentTimeMs; + if (lastScheduledContainer.containsKey(priority)) { + waitTime -= lastScheduledContainer.get(priority); + } else { + waitTime -= appSchedulable.getStartTime(); + } + + long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityDelayMs : rackLocalityDelayMs; + + if (waitTime > thresholdTime) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(priority, currentTimeMs); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(priority, currentTimeMs); + } + } + return allowedLocalityLevel.get(priority); + } synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request,