Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A7E23200BC0 for ; Tue, 15 Nov 2016 16:57:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A6520160AF2; Tue, 15 Nov 2016 15:57:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 852C1160B15 for ; Tue, 15 Nov 2016 16:57:08 +0100 (CET) Received: (qmail 15736 invoked by uid 500); 15 Nov 2016 15:57:06 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 15458 invoked by uid 99); 15 Nov 2016 15:57:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 15:57:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D4D4ED225; Tue, 15 Nov 2016 15:57:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Tue, 15 Nov 2016 15:57:09 -0000 Message-Id: <4a9964dc40b04e90b5b550ed8b4022ca@git.apache.org> In-Reply-To: <54a92b74f9f94400b37db18b1a839bfc@git.apache.org> References: <54a92b74f9f94400b37db18b1a839bfc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] hadoop git commit: YARN-4597. Introduce ContainerScheduler and a SCHEDULED state to NodeManager container lifecycle. (asuresh) archived-at: Tue, 15 Nov 2016 15:57:10 -0000 YARN-4597. Introduce ContainerScheduler and a SCHEDULED state to NodeManager container lifecycle. (asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3219b7b4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3219b7b4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3219b7b4 Branch: refs/heads/trunk Commit: 3219b7b4ac7d12aee343f6ab2980b3357fc618b6 Parents: 7ffb994 Author: Arun Suresh Authored: Tue Nov 15 07:48:55 2016 -0800 Committer: Arun Suresh Committed: Tue Nov 15 07:56:25 2016 -0800 ---------------------------------------------------------------------- .../hadoop/mapred/TestMROpportunisticMaps.java | 18 +- .../yarn/api/records/ContainerExitStatus.java | 6 + .../hadoop/yarn/api/records/ContainerState.java | 4 +- .../hadoop/yarn/conf/YarnConfiguration.java | 17 +- .../src/main/proto/yarn_protos.proto | 2 +- .../api/impl/TestDistributedScheduling.java | 3 +- .../yarn/client/api/impl/TestNMClient.java | 9 +- .../TestOpportunisticContainerAllocation.java | 2 + .../src/main/resources/yarn-default.xml | 6 +- .../hadoop/yarn/server/nodemanager/Context.java | 18 +- .../yarn/server/nodemanager/NodeManager.java | 42 +- .../nodemanager/NodeStatusUpdaterImpl.java | 54 -- .../containermanager/ContainerManager.java | 4 + .../containermanager/ContainerManagerImpl.java | 32 +- .../containermanager/container/Container.java | 6 + .../container/ContainerImpl.java | 86 +- .../container/ContainerState.java | 2 +- .../launcher/ContainerLaunch.java | 30 +- .../launcher/RecoveredContainerLaunch.java | 4 +- .../monitor/ContainersMonitor.java | 52 +- .../monitor/ContainersMonitorImpl.java | 79 +- .../queuing/QueuingContainerManagerImpl.java | 686 --------------- .../containermanager/queuing/package-info.java | 23 - ...locationBasedResourceUtilizationTracker.java | 137 +++ .../scheduler/ContainerScheduler.java | 419 +++++++++ .../scheduler/ContainerSchedulerEvent.java | 51 ++ .../scheduler/ContainerSchedulerEventType.java | 29 + .../scheduler/ResourceUtilizationTracker.java | 59 ++ .../scheduler/package-info.java | 22 + .../nodemanager/metrics/NodeManagerMetrics.java | 35 + .../nodemanager/webapp/ContainerLogsUtils.java | 5 +- .../yarn/server/nodemanager/TestEventFlow.java | 3 +- .../nodemanager/TestNodeManagerResync.java | 8 + .../nodemanager/TestNodeManagerShutdown.java | 5 +- .../nodemanager/TestNodeStatusUpdater.java | 123 --- .../amrmproxy/BaseAMRMProxyTest.java | 8 +- .../BaseContainerManagerTest.java | 24 +- .../TestContainerManagerRecovery.java | 34 +- .../TestContainerManagerRegression.java | 84 -- .../container/TestContainer.java | 26 +- .../queuing/TestQueuingContainerManager.java | 596 ------------- .../TestContainerSchedulerQueuing.java | 872 +++++++++++++++++++ .../nodemanager/webapp/MockContainer.java | 15 + ...pportunisticContainerAllocatorAMService.java | 4 +- .../rmcontainer/RMContainerImpl.java | 4 +- .../resourcemanager/rmnode/RMNodeImpl.java | 32 +- .../scheduler/AbstractYarnScheduler.java | 2 + .../scheduler/SchedulerNode.java | 9 +- .../hadoop/yarn/server/MiniYARNCluster.java | 55 +- 49 files changed, 1988 insertions(+), 1858 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java index 021863b..d975fd0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java @@ -68,15 +68,6 @@ public class TestMROpportunisticMaps { doTest(4, 1, 1, 2); } - /** - * Test will run with 6 Maps and 2 Reducers. All the Maps are OPPORTUNISTIC. - * @throws Exception - */ - @Test - public void testMultipleReducers() throws Exception { - doTest(6, 2, 1, 6); - } - public void doTest(int numMappers, int numReducers, int numNodes, int percent) throws Exception { doTest(numMappers, numReducers, numNodes, 1000, percent); @@ -94,7 +85,8 @@ public class TestMROpportunisticMaps { conf.setBoolean(YarnConfiguration. OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); dfsCluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numNodes).build(); fileSystem = dfsCluster.getFileSystem(); @@ -104,11 +96,7 @@ public class TestMROpportunisticMaps { createInput(fileSystem, numMappers, numLines); // Run the test. - Configuration jobConf = mrCluster.getConfig(); - jobConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); - - runMergeTest(new JobConf(jobConf), fileSystem, + runMergeTest(new JobConf(conf), fileSystem, numMappers, numReducers, numLines, percent); } finally { if (dfsCluster != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java index f88fa3b..0207010 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java @@ -72,4 +72,10 @@ public class ContainerExitStatus { */ public static final int KILLED_AFTER_APP_COMPLETION = -107; + /** + * Container was terminated by the ContainerScheduler to make room + * for another container... + */ + public static final int KILLED_BY_CONTAINER_SCHEDULER = -108; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java index 582389f..4efd8c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java @@ -36,6 +36,6 @@ public enum ContainerState { /** Completed container */ COMPLETE, - /** Queued at the NM. */ - QUEUED + /** Scheduled (awaiting resources) at the NM. */ + SCHEDULED } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b95bd1a..a54104d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -390,12 +390,16 @@ public class YarnConfiguration extends Configuration { public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT = 1.0f; - /** Min length of container queue at NodeManager. */ + /** Min length of container queue at NodeManager. This is a cluster-wide + * configuration that acts as the lower-bound of optimal queue length + * calculated by the NodeQueueLoadMonitor */ public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = YARN_PREFIX + "nm-container-queuing.min-queue-length"; public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1; - /** Max length of container queue at NodeManager. */ + /** Max length of container queue at NodeManager. This is a cluster-wide + * configuration that acts as the upper-bound of optimal queue length + * calculated by the NodeQueueLoadMonitor */ public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = YARN_PREFIX + "nm-container-queuing.max-queue-length"; public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10; @@ -834,10 +838,11 @@ public class YarnConfiguration extends Configuration { /** Prefix for all node manager configs.*/ public static final String NM_PREFIX = "yarn.nodemanager."; - /** Enable Queuing of OPPORTUNISTIC containers. */ - public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX - + "container-queuing-enabled"; - public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false; + /** Max Queue length of OPPORTUNISTIC containers on the NM. */ + public static final String NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH = + NM_PREFIX + "opportunistic-containers-max-queue-length"; + public static final int NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT = + 0; /** Environment variables that will be sent to containers.*/ public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 1022a38..cb37126 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -82,7 +82,7 @@ enum ContainerStateProto { C_NEW = 1; C_RUNNING = 2; C_COMPLETE = 3; - C_QUEUED = 4; + C_SCHEDULED = 4; } message ContainerProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 4cfc4eb..b552d19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -108,7 +108,8 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { conf.setBoolean(YarnConfiguration. OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); + conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + 10); cluster.init(conf); cluster.start(); yarnConf = cluster.getConfig(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 3640883..d211d6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -330,6 +331,12 @@ public class TestNMClient { ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class); + if (Shell.WINDOWS) { + clc.setCommands( + Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul")); + } else { + clc.setCommands(Arrays.asList("sleep", "10")); + } clc.setTokens(securityTokens); try { nmClient.startContainer(container, clc); @@ -415,7 +422,7 @@ public class TestNMClient { try { nmClient.increaseContainerResource(container); } catch (YarnException e) { - // NM container will only be in LOCALIZED state, so expect the increase + // NM container will only be in SCHEDULED state, so expect the increase // action to fail. if (!e.getMessage().contains( "can only be changed when a container is in RUNNING state")) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index ace145d..802c207 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -111,6 +111,8 @@ public class TestOpportunisticContainerAllocation { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setBoolean( YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 019166b..c436289 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1000,10 +1000,10 @@ - Enable Queuing of OPPORTUNISTIC containers on the + Max number of OPPORTUNISTIC containers to queue at the nodemanager. - yarn.nodemanager.container-queuing-enabled - false + yarn.nodemanager.opportunistic-containers-max-queue-length + 0 http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index e888393..16a8497 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -27,12 +27,12 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; @@ -47,15 +47,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; public interface Context { /** - * Interface exposing methods related to the queuing of containers in the NM. - */ - interface QueuingContext { - ConcurrentMap getQueuedContainers(); - - ConcurrentMap getKilledQueuedContainers(); - } - - /** * Return the nodeId. Usable only when the ContainerManager is started. * * @return the NodeId @@ -112,13 +103,6 @@ public interface Context { NodeStatusUpdater getNodeStatusUpdater(); - /** - * Returns a QueuingContext that provides information about the - * number of Containers Queued as well as the number of Containers that were - * queued and killed. - */ - QueuingContext getQueuingContext(); - boolean isDistributedSchedulingEnabled(); OpportunisticContainerAllocator getContainerAllocator(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 0f0a081..72875a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; @@ -64,7 +63,6 @@ import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorSer import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; @@ -177,14 +175,8 @@ public class NodeManager extends CompositeService ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { - if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, - YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) { - return new QueuingContainerManagerImpl(context, exec, del, - nodeStatusUpdater, metrics, dirsHandler); - } else { - return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, - metrics, dirsHandler); - } + return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, + metrics, dirsHandler); } protected NMCollectorService createNMCollectorService(Context ctxt) { @@ -510,7 +502,6 @@ public class NodeManager extends CompositeService private OpportunisticContainerAllocator containerAllocator; - private final QueuingContext queuingContext; private ContainerExecutor executor; private NMTimelinePublisher nmTimelinePublisher; @@ -533,7 +524,6 @@ public class NodeManager extends CompositeService this.stateStore = stateStore; this.logAggregationReportForApps = new ConcurrentLinkedQueue< LogAggregationReport>(); - this.queuingContext = new QueuingNMContext(); this.isDistSchedulingEnabled = isDistSchedulingEnabled; this.conf = conf; } @@ -662,11 +652,6 @@ public class NodeManager extends CompositeService this.nodeStatusUpdater = nodeStatusUpdater; } - @Override - public QueuingContext getQueuingContext() { - return this.queuingContext; - } - public boolean isDistributedSchedulingEnabled() { return isDistSchedulingEnabled; } @@ -716,29 +701,6 @@ public class NodeManager extends CompositeService } /** - * Class that keeps the context for containers queued at the NM. - */ - public static class QueuingNMContext implements Context.QueuingContext { - protected final ConcurrentMap - queuedContainers = new ConcurrentSkipListMap<>(); - - protected final ConcurrentMap - killedQueuedContainers = new ConcurrentHashMap<>(); - - @Override - public ConcurrentMap - getQueuedContainers() { - return this.queuedContainers; - } - - @Override - public ConcurrentMap - getKilledQueuedContainers() { - return this.killedQueuedContainers; - } - } - - /** * @return the node health checker */ public NodeHealthCheckerService getNodeHealthChecker() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 6e50d24..7f74ed8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -47,7 +47,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -62,7 +61,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; @@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -570,9 +567,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - // Account for all containers that got killed while they were still queued. - pendingCompletedContainers.putAll(getKilledQueuedContainerStatuses()); - containerStatuses.addAll(pendingCompletedContainers.values()); if (LOG.isDebugEnabled()) { @@ -582,43 +576,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return containerStatuses; } - /** - * Add to the container statuses the status of the containers that got killed - * while they were queued. - */ - private Map getKilledQueuedContainerStatuses() { - Map killedQueuedContainerStatuses = - new HashMap<>(); - for (Map.Entry killedQueuedContainer : - this.context.getQueuingContext(). - getKilledQueuedContainers().entrySet()) { - ContainerTokenIdentifier containerTokenId = killedQueuedContainer - .getKey(); - ContainerId containerId = containerTokenId.getContainerID(); - ContainerStatus containerStatus = BuilderUtils.newContainerStatus( - containerId, ContainerState.COMPLETE, - killedQueuedContainer.getValue(), ContainerExitStatus.ABORTED, - containerTokenId.getResource(), containerTokenId.getExecutionType()); - ApplicationId applicationId = containerId.getApplicationAttemptId() - .getApplicationId(); - if (isApplicationStopped(applicationId)) { - if (LOG.isDebugEnabled()) { - LOG.debug(applicationId + " is completing, " + " remove " - + containerId + " from NM context."); - } - this.context.getQueuingContext().getKilledQueuedContainers() - .remove(containerTokenId); - killedQueuedContainerStatuses.put(containerId, containerStatus); - } else { - if (!isContainerRecentlyStopped(containerId)) { - killedQueuedContainerStatuses.put(containerId, containerStatus); - } - } - addCompletedContainer(containerId); - } - return killedQueuedContainerStatuses; - } - private List getRunningApplications() { List runningApplications = new ArrayList(); runningApplications.addAll(this.context.getApplications().keySet()); @@ -703,17 +660,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - // Remove null containers from queuing context for killed queued containers. - Iterator killedQueuedContIter = - context.getQueuingContext().getKilledQueuedContainers().keySet(). - iterator(); - while (killedQueuedContIter.hasNext()) { - if (removedNullContainers.contains( - killedQueuedContIter.next().getContainerID())) { - killedQueuedContIter.remove(); - } - } - if (!removedContainers.isEmpty()) { LOG.info("Removed completed containers from NM context: " + removedContainers); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java index 1cbb8c7..066d987 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor .ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler + .ContainerScheduler; /** * The ContainerManager is an entity that manages the life cycle of Containers. @@ -42,4 +44,6 @@ public interface ContainerManager extends ServiceStateChangeListener, void setBlockNewContainerRequests(boolean blockNewContainerRequests); + ContainerScheduler getContainerScheduler(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index c7810f9..e8de8b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -136,6 +136,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Change import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; @@ -205,6 +208,7 @@ public class ContainerManagerImpl extends CompositeService implements private final WriteLock writeLock; private AMRMProxyService amrmProxyService; protected boolean amrmProxyEnabled = false; + private final ContainerScheduler containerScheduler; private long waitForContainersOnShutdownMillis; @@ -231,6 +235,8 @@ public class ContainerManagerImpl extends CompositeService implements addService(containersLauncher); this.nodeStatusUpdater = nodeStatusUpdater; + this.containerScheduler = createContainerScheduler(context); + addService(containerScheduler); // Start configurable services auxiliaryServices = new AuxServices(); @@ -259,7 +265,8 @@ public class ContainerManagerImpl extends CompositeService implements dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersLauncherEventType.class, containersLauncher); - + dispatcher.register(ContainerSchedulerEventType.class, containerScheduler); + addService(dispatcher); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -311,6 +318,14 @@ public class ContainerManagerImpl extends CompositeService implements } } + @VisibleForTesting + protected ContainerScheduler createContainerScheduler(Context cntxt) { + // Currently, this dispatcher is shared by the ContainerManager, + // all the containers, the container monitor and all the container. + // The ContainerScheduler may use its own dispatcher. + return new ContainerScheduler(cntxt, dispatcher, metrics); + } + protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) { return new ContainersMonitorImpl(exec, dispatcher, this.context); } @@ -1263,10 +1278,8 @@ public class ContainerManagerImpl extends CompositeService implements } } else { context.getNMStateStore().storeContainerKilled(containerID); - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - ContainerExitStatus.KILLED_BY_APPMASTER, - "Container killed by the ApplicationMaster.")); + container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER, + "Container killed by the ApplicationMaster."); NMAuditLogger.logSuccess(container.getUser(), AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID @@ -1521,12 +1534,12 @@ public class ContainerManagerImpl extends CompositeService implements @Override public OpportunisticContainersStatus getOpportunisticContainersStatus() { - return OpportunisticContainersStatus.newInstance(); + return this.containerScheduler.getOpportunisticContainersStatus(); } @Override public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) { - LOG.trace("Implementation does not support queuing of Containers!!"); + this.containerScheduler.updateQueuingLimit(queuingLimit); } @SuppressWarnings("unchecked") @@ -1687,4 +1700,9 @@ public class ContainerManagerImpl extends CompositeService implements LOG.info("Container " + containerId + " no longer exists"); } } + + @Override + public ContainerScheduler getContainerScheduler() { + return this.containerScheduler; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 78c240a..8004f33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -83,7 +83,13 @@ public interface Container extends EventHandler { boolean isReInitializing(); + boolean isMarkedForKilling(); + boolean canRollback(); void commitUpgrade(); + + void sendLaunchEvent(); + + void sendKillEvent(int exitStatus, String description); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 307cf5d..4a6be32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; @@ -164,6 +166,7 @@ public class ContainerImpl implements Container { private String ips; private volatile ReInitializationContext reInitContext; private volatile boolean isReInitializing = false; + private volatile boolean isMarkeForKilling = false; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -286,7 +289,7 @@ public class ContainerImpl implements Container { // From NEW State .addTransition(ContainerState.NEW, EnumSet.of(ContainerState.LOCALIZING, - ContainerState.LOCALIZED, + ContainerState.SCHEDULED, ContainerState.LOCALIZATION_FAILED, ContainerState.DONE), ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) @@ -298,7 +301,7 @@ public class ContainerImpl implements Container { // From LOCALIZING State .addTransition(ContainerState.LOCALIZING, - EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED), + EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED), ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition()) .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZATION_FAILED, @@ -309,7 +312,7 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, - new KillDuringLocalizationTransition()) + new KillBeforeRunningTransition()) // From LOCALIZATION_FAILED State .addTransition(ContainerState.LOCALIZATION_FAILED, @@ -334,17 +337,18 @@ public class ContainerImpl implements Container { ContainerState.LOCALIZATION_FAILED, ContainerEventType.RESOURCE_FAILED) - // From LOCALIZED State - .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING, + // From SCHEDULED State + .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING, ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) - .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE, + .addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition(true)) - .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED, + .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) - .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, - ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.SCHEDULED, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, + new KillBeforeRunningTransition()) // From RUNNING State .addTransition(ContainerState.RUNNING, @@ -353,7 +357,7 @@ public class ContainerImpl implements Container { new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, EnumSet.of(ContainerState.RELAUNCHING, - ContainerState.LOCALIZED, + ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new RetryFailureTransition()) @@ -402,7 +406,7 @@ public class ContainerImpl implements Container { .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) .addTransition(ContainerState.REINITIALIZING, - ContainerState.LOCALIZED, + ContainerState.SCHEDULED, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledForReInitializationTransition()) @@ -520,9 +524,11 @@ public class ContainerImpl implements Container { case NEW: case LOCALIZING: case LOCALIZATION_FAILED: - case LOCALIZED: + case SCHEDULED: + return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED; case RUNNING: case RELAUNCHING: + case REINITIALIZING: case EXITED_WITH_SUCCESS: case EXITED_WITH_FAILURE: case KILLING: @@ -553,7 +559,7 @@ public class ContainerImpl implements Container { public Map> getLocalizedResources() { this.readLock.lock(); try { - if (ContainerState.LOCALIZED == getContainerState() + if (ContainerState.SCHEDULED == getContainerState() || ContainerState.RELAUNCHING == getContainerState()) { return resourceSet.getLocalizedResources(); } else { @@ -690,6 +696,9 @@ public class ContainerImpl implements Container { ContainerStatus containerStatus = cloneAndGetContainerStatus(); eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus)); + // Tell the scheduler the container is Done + eventHandler.handle(new ContainerSchedulerEvent(this, + ContainerSchedulerEventType.CONTAINER_COMPLETED)); // Remove the container from the resource-monitor eventHandler.handle(new ContainerStopMonitoringEvent(containerId)); // Tell the logService too @@ -698,7 +707,8 @@ public class ContainerImpl implements Container { } @SuppressWarnings("unchecked") // dispatcher not typed - private void sendLaunchEvent() { + @Override + public void sendLaunchEvent() { ContainersLauncherEventType launcherEvent = ContainersLauncherEventType.LAUNCH_CONTAINER; if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { @@ -711,6 +721,22 @@ public class ContainerImpl implements Container { } @SuppressWarnings("unchecked") // dispatcher not typed + private void sendScheduleEvent() { + dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(this, + ContainerSchedulerEventType.SCHEDULE_CONTAINER) + ); + } + + @SuppressWarnings("unchecked") // dispatcher not typed + @Override + public void sendKillEvent(int exitStatus, String description) { + this.isMarkeForKilling = true; + dispatcher.getEventHandler().handle( + new ContainerKillEvent(containerId, exitStatus, description)); + } + + @SuppressWarnings("unchecked") // dispatcher not typed private void sendRelaunchEvent() { ContainersLauncherEventType launcherEvent = ContainersLauncherEventType.RELAUNCH_CONTAINER; @@ -781,7 +807,7 @@ public class ContainerImpl implements Container { * to the ResourceLocalizationManager and enters LOCALIZING state. * * If there are no resources to localize, sends LAUNCH_CONTAINER event - * and enters LOCALIZED state directly. + * and enters SCHEDULED state directly. * * If there are any invalid resources specified, enters LOCALIZATION_FAILED * directly. @@ -847,9 +873,9 @@ public class ContainerImpl implements Container { } return ContainerState.LOCALIZING; } else { - container.sendLaunchEvent(); + container.sendScheduleEvent(); container.metrics.endInitingContainer(); - return ContainerState.LOCALIZED; + return ContainerState.SCHEDULED; } } } @@ -889,7 +915,7 @@ public class ContainerImpl implements Container { new ContainerLocalizationEvent(LocalizationEventType. CONTAINER_RESOURCES_LOCALIZED, container)); - container.sendLaunchEvent(); + container.sendScheduleEvent(); container.metrics.endInitingContainer(); // If this is a recovered container that has already launched, skip @@ -909,7 +935,7 @@ public class ContainerImpl implements Container { SharedCacheUploadEventType.UPLOAD)); } - return ContainerState.LOCALIZED; + return ContainerState.SCHEDULED; } } @@ -1099,7 +1125,7 @@ public class ContainerImpl implements Container { } /** - * Transition from LOCALIZED state to RUNNING state upon receiving + * Transition from SCHEDULED state to RUNNING state upon receiving * a CONTAINER_LAUNCHED event. */ static class LaunchTransition extends ContainerTransition { @@ -1257,7 +1283,7 @@ public class ContainerImpl implements Container { container.containerId.getApplicationAttemptId().getApplicationId(), container.containerId); new KilledForReInitializationTransition().transition(container, event); - return ContainerState.LOCALIZED; + return ContainerState.SCHEDULED; } else { new ExitedWithFailureTransition(true).transition(container, event); return ContainerState.EXITED_WITH_FAILURE; @@ -1339,7 +1365,7 @@ public class ContainerImpl implements Container { } /** - * Transition to LOCALIZED and wait for RE-LAUNCH + * Transition to SCHEDULED and wait for RE-LAUNCH */ static class KilledForReInitializationTransition extends ContainerTransition { @@ -1363,8 +1389,8 @@ public class ContainerImpl implements Container { container.resourceSet = container.reInitContext.mergedResourceSet(container.resourceSet); - - container.sendLaunchEvent(); + container.isMarkeForKilling = false; + container.sendScheduleEvent(); } } @@ -1392,7 +1418,7 @@ public class ContainerImpl implements Container { * Transition from LOCALIZING to KILLING upon receiving * KILL_CONTAINER event. */ - static class KillDuringLocalizationTransition implements + static class KillBeforeRunningTransition implements SingleArcTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { @@ -1424,7 +1450,7 @@ public class ContainerImpl implements Container { /** * Transitions upon receiving KILL_CONTAINER. - * - LOCALIZED -> KILLING. + * - SCHEDULED -> KILLING. * - RUNNING -> KILLING. * - REINITIALIZING -> KILLING. */ @@ -1651,7 +1677,8 @@ public class ContainerImpl implements Container { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitionException e) { LOG.warn("Can't handle this event at current state: Current: [" - + oldState + "], eventType: [" + event.getType() + "]", e); + + oldState + "], eventType: [" + event.getType() + "]," + + " container: [" + containerID + "]", e); } if (oldState != newState) { LOG.info("Container " + containerID + " transitioned from " @@ -1715,6 +1742,11 @@ public class ContainerImpl implements Container { } @Override + public boolean isMarkedForKilling() { + return this.isMarkeForKilling; + } + + @Override public boolean canRollback() { return (this.reInitContext != null) && (this.reInitContext.canRollback()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 70de90c..91d1356 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; public enum ContainerState { - NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING, + NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING, REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index d774030..823457f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -104,9 +104,10 @@ public class ContainerLaunch implements Callable { private final Context context; private final ContainerManagerImpl containerManager; - protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); + protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false); protected AtomicBoolean completed = new AtomicBoolean(false); + private volatile boolean killedBeforeStart = false; private long sleepDelayBeforeSigKill = 250; private long maxKillWaitTime = 2000; @@ -401,7 +402,12 @@ public class ContainerLaunch implements Callable { @SuppressWarnings("unchecked") protected int launchContainer(ContainerStartContext ctx) throws IOException { ContainerId containerId = container.getContainerId(); - + if (container.isMarkedForKilling()) { + LOG.info("Container " + containerId + " not launched as it has already " + + "been marked for Killing"); + this.killedBeforeStart = true; + return ExitCode.TERMINATED.getExitCode(); + } // LaunchContainer is a blocking call. We are here almost means the // container is launched, so send out the event. dispatcher.getEventHandler().handle(new ContainerEvent( @@ -410,7 +416,7 @@ public class ContainerLaunch implements Callable { context.getNMStateStore().storeContainerLaunched(containerId); // Check if the container is signalled to be killed. - if (!shouldLaunchContainer.compareAndSet(false, true)) { + if (!containerAlreadyLaunched.compareAndSet(false, true)) { LOG.info("Container " + containerId + " not launched as " + "cleanup already called"); return ExitCode.TERMINATED.getExitCode(); @@ -451,10 +457,14 @@ public class ContainerLaunch implements Callable { || exitCode == ExitCode.TERMINATED.getExitCode()) { // If the process was killed, Send container_cleanedup_after_kill and // just break out of this method. - dispatcher.getEventHandler().handle( - new ContainerExitEvent(containerId, - ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode, - diagnosticInfo.toString())); + + // If Container was killed before starting... NO need to do this. + if (!killedBeforeStart) { + dispatcher.getEventHandler().handle( + new ContainerExitEvent(containerId, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode, + diagnosticInfo.toString())); + } } else if (exitCode != 0) { handleContainerExitWithFailure(containerId, exitCode, containerLogDir, diagnosticInfo); @@ -565,7 +575,8 @@ public class ContainerLaunch implements Callable { } // launch flag will be set to true if process already launched - boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + boolean alreadyLaunched = + !containerAlreadyLaunched.compareAndSet(false, true); if (!alreadyLaunched) { LOG.info("Container " + containerIdStr + " not launched." + " No cleanup needed to be done"); @@ -660,7 +671,8 @@ public class ContainerLaunch implements Callable { LOG.info("Sending signal " + command + " to container " + containerIdStr); - boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + boolean alreadyLaunched = + !containerAlreadyLaunched.compareAndSet(false, true); if (!alreadyLaunched) { LOG.info("Container " + containerIdStr + " not launched." + " Not sending the signal"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java index 3cd31b7..a04a23f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java @@ -39,7 +39,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; -import org.apache.hadoop.yarn.util.ConverterUtils; + /** * This is a ContainerLaunch which has been recovered after an NM restart (for @@ -57,7 +57,7 @@ public class RecoveredContainerLaunch extends ContainerLaunch { { super(context, configuration, dispatcher, exec, app, container, dirsHandler, containerManager); - this.shouldLaunchContainer.set(true); + this.containerAlreadyLaunched.set(true); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 1069b4f..64831e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -19,29 +19,51 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; public interface ContainersMonitor extends Service, EventHandler, ResourceView { - public ResourceUtilization getContainersUtilization(); + ResourceUtilization getContainersUtilization(); - ResourceUtilization getContainersAllocation(); - - boolean hasResourcesAvailable(ProcessTreeInfo pti); - - void increaseContainersAllocation(ProcessTreeInfo pti); - - void decreaseContainersAllocation(ProcessTreeInfo pti); - - void increaseResourceUtilization(ResourceUtilization resourceUtil, - ProcessTreeInfo pti); - - void decreaseResourceUtilization(ResourceUtilization resourceUtil, - ProcessTreeInfo pti); + float getVmemRatio(); void subtractNodeResourcesFromResourceUtilization( ResourceUtilization resourceUtil); + + /** + * Utility method to add a {@link Resource} to the + * {@link ResourceUtilization}. + * @param containersMonitor Containers Monitor. + * @param resourceUtil Resource Utilization. + * @param resource Resource. + */ + static void increaseResourceUtilization( + ContainersMonitor containersMonitor, ResourceUtilization resourceUtil, + Resource resource) { + float vCores = (float) resource.getVirtualCores() / + containersMonitor.getVCoresAllocatedForContainers(); + int vmem = (int) (resource.getMemorySize() + * containersMonitor.getVmemRatio()); + resourceUtil.addTo((int)resource.getMemorySize(), vmem, vCores); + } + + /** + * Utility method to subtract a {@link Resource} from the + * {@link ResourceUtilization}. + * @param containersMonitor Containers Monitor. + * @param resourceUtil Resource Utilization. + * @param resource Resource. + */ + static void decreaseResourceUtilization( + ContainersMonitor containersMonitor, ResourceUtilization resourceUtil, + Resource resource) { + float vCores = (float) resource.getVirtualCores() / + containersMonitor.getVCoresAllocatedForContainers(); + int vmem = (int) (resource.getMemorySize() + * containersMonitor.getVmemRatio()); + resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index fad2b6a..1914b42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -97,9 +97,6 @@ public class ContainersMonitorImpl extends AbstractService implements } private ResourceUtilization containersUtilization; - // Tracks the aggregated allocation of the currently allocated containers - // when queuing of containers at the NMs is enabled. - private final ResourceUtilization containersAllocation; private volatile boolean stopped = false; @@ -114,7 +111,6 @@ public class ContainersMonitorImpl extends AbstractService implements this.monitoringThread = new MonitoringThread(); this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); - this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); } @Override @@ -743,6 +739,8 @@ public class ContainersMonitorImpl extends AbstractService implements LOG.warn("Container " + containerId.toString() + "does not exist"); return; } + // YARN-5860: Route this through the ContainerScheduler to + // fix containerAllocation container.setResource(resource); } @@ -842,67 +840,6 @@ public class ContainersMonitorImpl extends AbstractService implements this.containersUtilization = utilization; } - public ResourceUtilization getContainersAllocation() { - return this.containersAllocation; - } - - /** - * @return true if there are available allocated resources for the given - * container to start. - */ - @Override - public boolean hasResourcesAvailable(ProcessTreeInfo pti) { - synchronized (this.containersAllocation) { - // Check physical memory. - if (this.containersAllocation.getPhysicalMemory() + - (int) (pti.getPmemLimit() >> 20) > - (int) (getPmemAllocatedForContainers() >> 20)) { - return false; - } - // Check virtual memory. - if (isVmemCheckEnabled() && - this.containersAllocation.getVirtualMemory() + - (int) (pti.getVmemLimit() >> 20) > - (int) (getVmemAllocatedForContainers() >> 20)) { - return false; - } - // Check CPU. - if (this.containersAllocation.getCPU() - + allocatedCpuUsage(pti) > 1.0f) { - return false; - } - } - return true; - } - - @Override - public void increaseContainersAllocation(ProcessTreeInfo pti) { - synchronized (this.containersAllocation) { - increaseResourceUtilization(this.containersAllocation, pti); - } - } - - @Override - public void decreaseContainersAllocation(ProcessTreeInfo pti) { - synchronized (this.containersAllocation) { - decreaseResourceUtilization(this.containersAllocation, pti); - } - } - - @Override - public void increaseResourceUtilization(ResourceUtilization resourceUtil, - ProcessTreeInfo pti) { - resourceUtil.addTo((int) (pti.getPmemLimit() >> 20), - (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti)); - } - - @Override - public void decreaseResourceUtilization(ResourceUtilization resourceUtil, - ProcessTreeInfo pti) { - resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20), - (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti)); - } - @Override public void subtractNodeResourcesFromResourceUtilization( ResourceUtilization resourceUtil) { @@ -910,14 +847,9 @@ public class ContainersMonitorImpl extends AbstractService implements (int) (getVmemAllocatedForContainers() >> 20), 1.0f); } - /** - * Calculates the vCores CPU usage that is assigned to the given - * {@link ProcessTreeInfo}. In particular, it takes into account the number of - * vCores that are allowed to be used by the NM and returns the CPU usage - * as a normalized value between {@literal >=} 0 and {@literal <=} 1. - */ - private float allocatedCpuUsage(ProcessTreeInfo pti) { - return (float) pti.getCpuVcores() / getVCoresAllocatedForContainers(); + @Override + public float getVmemRatio() { + return vmemRatio; } @Override @@ -988,5 +920,4 @@ public class ContainersMonitorImpl extends AbstractService implements startEvent.getVmemLimit(), startEvent.getPmemLimit(), startEvent.getCpuVcores())); } - } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org