Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA2B6200B89 for ; Tue, 6 Sep 2016 18:43:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E8DE0160AA9; Tue, 6 Sep 2016 16:43:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C437D160ACF for ; Tue, 6 Sep 2016 18:43:47 +0200 (CEST) Received: (qmail 27065 invoked by uid 500); 6 Sep 2016 16:43:36 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 26199 invoked by uid 99); 6 Sep 2016 16:43:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2016 16:43:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 80297E0BD9; Tue, 6 Sep 2016 16:43:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cnauroth@apache.org To: common-commits@hadoop.apache.org Date: Tue, 06 Sep 2016 16:44:00 -0000 Message-Id: In-Reply-To: <99b9cf187a7d42009393da29b21bbd46@git.apache.org> References: <99b9cf187a7d42009393da29b21bbd46@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] hadoop git commit: YARN-5221. Expose UpdateResourceRequest API to allow AM to request for change in container properties. (asuresh) archived-at: Tue, 06 Sep 2016 16:43:50 -0000 YARN-5221. Expose UpdateResourceRequest API to allow AM to request for change in container properties. (asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6d9cff2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6d9cff2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6d9cff2 Branch: refs/heads/HADOOP-13345 Commit: d6d9cff21b7b6141ed88359652cf22e8973c0661 Parents: 9dcbdbd Author: Arun Suresh Authored: Sat Aug 27 15:22:43 2016 -0700 Committer: Arun Suresh Committed: Tue Aug 30 15:52:29 2016 -0700 ---------------------------------------------------------------------- .../app/local/TestLocalContainerAllocator.java | 4 +- .../v2/app/rm/TestRMContainerAllocator.java | 10 +- .../sls/scheduler/ResourceSchedulerWrapper.java | 6 +- .../sls/scheduler/SLSCapacityScheduler.java | 6 +- .../api/protocolrecords/AllocateRequest.java | 64 +--- .../api/protocolrecords/AllocateResponse.java | 76 +++-- .../hadoop/yarn/api/records/Container.java | 24 +- .../records/ContainerResourceChangeRequest.java | 117 ------- .../yarn/api/records/ContainerUpdateType.java | 45 +++ .../yarn/api/records/UpdateContainerError.java | 119 +++++++ .../api/records/UpdateContainerRequest.java | 218 ++++++++++++ .../yarn/api/records/UpdatedContainer.java | 118 +++++++ .../src/main/proto/yarn_protos.proto | 6 +- .../src/main/proto/yarn_service_protos.proto | 31 +- .../distributedshell/ApplicationMaster.java | 4 +- .../yarn/client/api/async/AMRMClientAsync.java | 9 +- .../api/async/impl/AMRMClientAsyncImpl.java | 8 +- .../yarn/client/api/impl/AMRMClientImpl.java | 84 ++--- .../api/async/impl/TestAMRMClientAsync.java | 55 +-- .../yarn/client/api/impl/TestAMRMClient.java | 42 +-- .../api/impl/TestAMRMClientOnRMRestart.java | 14 +- .../impl/pb/AllocateRequestPBImpl.java | 151 +++------ .../impl/pb/AllocateResponsePBImpl.java | 192 ++++++++--- .../api/records/impl/pb/ContainerPBImpl.java | 13 + .../ContainerResourceChangeRequestPBImpl.java | 141 -------- .../yarn/api/records/impl/pb/ProtoUtils.java | 69 +++- .../impl/pb/UpdateContainerErrorPBImpl.java | 125 +++++++ .../impl/pb/UpdateContainerRequestPBImpl.java | 187 ++++++++++ .../records/impl/pb/UpdatedContainerPBImpl.java | 117 +++++++ .../yarn/security/ContainerTokenIdentifier.java | 29 +- .../src/main/proto/yarn_security_token.proto | 1 + .../hadoop/yarn/api/TestPBImplRecords.java | 17 +- .../yarn/security/TestYARNTokenIdentifier.java | 4 +- .../api/protocolrecords/NMContainerStatus.java | 15 +- .../impl/pb/NMContainerStatusPBImpl.java | 13 + .../OpportunisticContainerAllocator.java | 2 +- .../hadoop/yarn/server/utils/BuilderUtils.java | 14 +- .../yarn_server_common_service_protos.proto | 1 + .../protocolrecords/TestProtocolRecords.java | 4 +- .../TestRegisterNodeManagerRequest.java | 2 +- .../containermanager/ContainerManagerImpl.java | 16 +- .../container/ContainerImpl.java | 7 +- .../queuing/QueuingContainerManagerImpl.java | 3 +- .../recovery/NMLeveldbStateStoreService.java | 41 ++- .../recovery/NMNullStateStoreService.java | 4 +- .../recovery/NMStateStoreService.java | 13 +- .../nodemanager/TestNodeManagerResync.java | 2 +- .../nodemanager/TestNodeStatusUpdater.java | 24 +- .../amrmproxy/MockResourceManagerFacade.java | 4 +- .../BaseContainerManagerTest.java | 2 +- .../recovery/NMMemoryStateStoreService.java | 7 +- .../TestNMLeveldbStateStoreService.java | 7 +- .../nodemanager/webapp/MockContainer.java | 2 +- .../nodemanager/webapp/TestNMWebServer.java | 6 +- .../ApplicationMasterService.java | 54 ++- .../server/resourcemanager/RMServerUtils.java | 338 ++++++++++--------- .../scheduler/AbstractYarnScheduler.java | 13 +- .../scheduler/SchedContainerChangeRequest.java | 2 +- .../scheduler/SchedulerApplicationAttempt.java | 12 +- .../scheduler/YarnScheduler.java | 6 +- .../scheduler/capacity/CapacityScheduler.java | 8 +- .../scheduler/fair/FairScheduler.java | 6 +- .../scheduler/fifo/FifoScheduler.java | 6 +- .../security/RMContainerTokenSecretManager.java | 64 ++-- .../yarn/server/resourcemanager/MockAM.java | 7 +- .../resourcemanager/TestApplicationCleanup.java | 9 +- .../TestApplicationMasterService.java | 86 +++-- .../server/resourcemanager/TestRMRestart.java | 2 +- .../TestResourceTrackerService.java | 8 +- .../capacity/TestCapacityScheduler.java | 42 ++- .../capacity/TestContainerAllocation.java | 13 +- .../capacity/TestContainerResizing.java | 134 +++++--- .../capacity/TestIncreaseAllocationExpirer.java | 76 +++-- .../server/TestContainerManagerSecurity.java | 18 +- .../TestMiniYarnClusterNodeUtilization.java | 2 - .../src/test/proto/test_token.proto | 1 + 76 files changed, 2099 insertions(+), 1103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java index f9e4595b..3fa0043 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; @@ -296,8 +297,7 @@ public class TestLocalContainerAllocator { Resources.none(), null, 1, null, Collections.emptyList(), yarnToken, - Collections.emptyList(), - Collections.emptyList()); + Collections.emptyList()); response.setApplicationPriority(Priority.newInstance(0)); return response; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 44aa593..a115b13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -99,7 +99,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -108,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -1703,8 +1703,8 @@ public class TestRMContainerAllocator { ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req @@ -1750,8 +1750,8 @@ public class TestRMContainerAllocator { ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequest, - List decreaseRequests) { + List increaseRequest, + List decreaseRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 393300c..79f934c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -206,8 +206,8 @@ final public class ResourceSchedulerWrapper public Allocation allocate(ApplicationAttemptId attemptId, List resourceRequests, List containerIds, List strings, List strings2, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 1c3fa79..cf08309 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -179,8 +179,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements public Allocation allocate(ApplicationAttemptId attemptId, List resourceRequests, List containerIds, List strings, List strings2, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index e24ebdf..f7ce127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.util.Records; /** @@ -48,13 +48,8 @@ import org.apache.hadoop.yarn.util.Records; * A list of unused {@link Container} which are being returned. * *
  • - * A list of {@link ContainerResourceChangeRequest} to inform - * the ResourceManager about the resource increase - * requirements of running containers. - *
  • - *
  • - * A list of {@link ContainerResourceChangeRequest} to inform - * the ResourceManager about the resource decrease + * A list of {@link UpdateContainerRequest} to inform + * the ResourceManager about the change in * requirements of running containers. *
  • * @@ -72,25 +67,23 @@ public abstract class AllocateRequest { List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest) { return newInstance(responseID, appProgress, resourceAsk, - containersToBeReleased, resourceBlacklistRequest, null, null); + containersToBeReleased, resourceBlacklistRequest, null); } @Public - @Stable + @Unstable public static AllocateRequest newInstance(int responseID, float appProgress, List resourceAsk, List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, - List increaseRequests, - List decreaseRequests) { + List updateRequests) { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(responseID); allocateRequest.setProgress(appProgress); allocateRequest.setAskList(resourceAsk); allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); - allocateRequest.setIncreaseRequests(increaseRequests); - allocateRequest.setDecreaseRequests(decreaseRequests); + allocateRequest.setUpdateRequests(updateRequests); return allocateRequest; } @@ -197,48 +190,25 @@ public abstract class AllocateRequest { ResourceBlacklistRequest resourceBlacklistRequest); /** - * Get the list of container resource increase requests being sent by the - * ApplicationMaster. - * @return the list of {@link ContainerResourceChangeRequest} - * being sent by the - * ApplicationMaster. - */ - @Public - @Unstable - public abstract List getIncreaseRequests(); - - /** - * Set the list of container resource increase requests to inform the - * ResourceManager about the containers whose resources need - * to be increased. - * @param increaseRequests list of - * {@link ContainerResourceChangeRequest} - */ - @Public - @Unstable - public abstract void setIncreaseRequests( - List increaseRequests); - - /** - * Get the list of container resource decrease requests being sent by the + * Get the list of container update requests being sent by the * ApplicationMaster. - * @return list of {@link ContainerResourceChangeRequest} + * @return list of {@link UpdateContainerRequest} * being sent by the * ApplicationMaster. */ @Public @Unstable - public abstract List getDecreaseRequests(); + public abstract List getUpdateRequests(); /** - * Set the list of container resource decrease requests to inform the - * ResourceManager about the containers whose resources need - * to be decreased. - * @param decreaseRequests list of - * {@link ContainerResourceChangeRequest} + * Set the list of container update requests to inform the + * ResourceManager about the containers that need to be + * updated. + * @param updateRequests list of UpdateContainerRequest for + * containers to be updated */ @Public @Unstable - public abstract void setDecreaseRequests( - List decreaseRequests); + public abstract void setUpdateRequests( + List updateRequests); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 4fba423..69089ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.util.Records; /** @@ -95,19 +98,17 @@ public abstract class AllocateResponse { } @Public - @Stable + @Unstable public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, - List increasedContainers, - List decreasedContainers) { + List updatedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, nmTokens); - response.setIncreasedContainers(increasedContainers); - response.setDecreasedContainers(decreasedContainers); + response.setUpdatedContainers(updatedContainers); return response; } @@ -118,12 +119,11 @@ public abstract class AllocateResponse { List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, Token amRMToken, - List increasedContainers, - List decreasedContainers) { + List updatedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, increasedContainers, decreasedContainers); + nmTokens, updatedContainers); response.setAMRMToken(amRMToken); return response; } @@ -135,13 +135,11 @@ public abstract class AllocateResponse { List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, Token amRMToken, - List increasedContainers, - List decreasedContainers, - String collectorAddr) { + List updatedContainers, String collectorAddr) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, increasedContainers, decreasedContainers); + nmTokens, updatedContainers); response.setAMRMToken(amRMToken); response.setCollectorAddr(collectorAddr); return response; @@ -290,40 +288,24 @@ public abstract class AllocateResponse { public abstract void setNMTokens(List nmTokens); /** - * Get the list of newly increased containers by + * Get the list of newly updated containers by * ResourceManager. * @return list of newly increased containers */ @Public @Unstable - public abstract List getIncreasedContainers(); - - /** - * Set the list of newly increased containers by - * ResourceManager. - */ - @Private - @Unstable - public abstract void setIncreasedContainers( - List increasedContainers); - - /** - * Get the list of newly decreased containers by - * ResourceManager. - * @return the list of newly decreased containers - */ - @Public - @Unstable - public abstract List getDecreasedContainers(); + public abstract List getUpdatedContainers(); /** - * Set the list of newly decreased containers by + * Set the list of newly updated containers by * ResourceManager. + * + * @param updatedContainers List of Updated Containers. */ @Private @Unstable - public abstract void setDecreasedContainers( - List decreasedContainers); + public abstract void setUpdatedContainers( + List updatedContainers); /** * The AMRMToken that belong to this attempt @@ -364,4 +346,28 @@ public abstract class AllocateResponse { @Unstable public abstract void setCollectorAddr(String collectorAddr); + /** + * Get the list of container update errors to inform the + * Application Master about the container updates that could not be + * satisfied due to error. + * + * @return List of Update Container Errors. + */ + @Public + @Unstable + public List getUpdateErrors() { + return new ArrayList<>(); + } + + /** + * Set the list of container update errors to inform the + * Application Master about the container updates that could not be + * satisfied due to error. + * @param updateErrors list of UpdateContainerError for + * containers updates requests that were in error + */ + @Public + @Unstable + public void setUpdateErrors(List updateErrors) { + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 707a71d..4fdc803 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -230,8 +230,30 @@ public abstract class Container implements Comparable { * allocation. */ @Private - @Evolving + @Unstable public void setAllocationRequestId(long allocationRequestID) { throw new UnsupportedOperationException(); } + + /** + * Get the version of this container. The version will be incremented when + * a container is updated. + * + * @return version of this container. + */ + @Private + @Unstable + public int getVersion() { + return 0; + } + + /** + * Set the version of this container. + * @param version of this container. + */ + @Private + @Unstable + public void setVersion(int version) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java deleted file mode 100644 index 117015b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.util.Records; - -/** - * {@code ContainerResourceChangeRequest} represents the request made by an - * application to the {@code ResourceManager} to change resource allocation of - * a running {@code Container}. - *

    - * It includes: - *

      - *
    • {@link ContainerId} for the container.
    • - *
    • - * {@link Resource} capability of the container after the resource change - * is completed. - *
    • - *
    - * - * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) - */ -@Public -@Unstable -public abstract class ContainerResourceChangeRequest { - - @Public - @Unstable - public static ContainerResourceChangeRequest newInstance( - ContainerId existingContainerId, Resource targetCapability) { - ContainerResourceChangeRequest context = Records - .newRecord(ContainerResourceChangeRequest.class); - context.setContainerId(existingContainerId); - context.setCapability(targetCapability); - return context; - } - - /** - * Get the ContainerId of the container. - * @return ContainerId of the container - */ - @Public - @Unstable - public abstract ContainerId getContainerId(); - - /** - * Set the ContainerId of the container. - * @param containerId ContainerId of the container - */ - @Public - @Unstable - public abstract void setContainerId(ContainerId containerId); - - /** - * Get the Resource capability of the container. - * @return Resource capability of the container - */ - @Public - @Unstable - public abstract Resource getCapability(); - - /** - * Set the Resource capability of the container. - * @param capability Resource capability of the container - */ - @Public - @Unstable - public abstract void setCapability(Resource capability); - - @Override - public int hashCode() { - return getCapability().hashCode() + getContainerId().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof ContainerResourceChangeRequest) { - ContainerResourceChangeRequest ctx = - (ContainerResourceChangeRequest) other; - - if (getContainerId() == null && ctx.getContainerId() != null) { - return false; - } else if (!getContainerId().equals(ctx.getContainerId())) { - return false; - } - - if (getCapability() == null && ctx.getCapability() != null) { - return false; - } else if (!getCapability().equals(ctx.getCapability())) { - return false; - } - - return true; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java new file mode 100644 index 0000000..978ea09 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Encodes the type of Container Update. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public enum ContainerUpdateType { + + /** + * Resource increase. + */ + INCREASE_RESOURCE, + + /** + * Resource decrease. + */ + DECREASE_RESOURCE, + + /** + * Execution Type change. + */ + UPDATE_EXECUTION_TYPE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java new file mode 100644 index 0000000..7102f7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code UpdateContainerError} is used by the Scheduler to notify the + * ApplicationMaster of an UpdateContainerRequest it cannot satisfy due to + * an error in the request. It includes the update request as well as + * a reason for why the request was not satisfiable. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class UpdateContainerError { + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdateContainerError newInstance(String reason, + UpdateContainerRequest updateContainerRequest) { + UpdateContainerError error = Records.newRecord(UpdateContainerError.class); + error.setReason(reason); + error.setUpdateContainerRequest(updateContainerRequest); + return error; + } + + /** + * Get reason why the update request was not satisfiable. + * @return Reason + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract String getReason(); + + /** + * Set reason why the update request was not satisfiable. + * @param reason Reason + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setReason(String reason); + + /** + * Get the {@code UpdateContainerRequest} that was not satisfiable. + * @return UpdateContainerRequest + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract UpdateContainerRequest getUpdateContainerRequest(); + + /** + * Set the {@code UpdateContainerRequest} that was not satisfiable. + * @param updateContainerRequest Update Container Request + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setUpdateContainerRequest( + UpdateContainerRequest updateContainerRequest); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + String reason = getReason(); + UpdateContainerRequest updateReq = getUpdateContainerRequest(); + result = prime * result + ((reason == null) ? 0 : reason.hashCode()); + result = prime * result + ((updateReq == null) ? 0 : updateReq.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpdateContainerError other = (UpdateContainerError) obj; + String reason = getReason(); + if (reason == null) { + if (other.getReason() != null) { + return false; + } + } else if (!reason.equals(other.getReason())) { + return false; + } + UpdateContainerRequest req = getUpdateContainerRequest(); + if (req == null) { + if (other.getUpdateContainerRequest() != null) { + return false; + } + } else if (!req.equals(other.getUpdateContainerRequest())) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java new file mode 100644 index 0000000..ef39f5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code UpdateContainerRequest} represents the request made by an + * application to the {@code ResourceManager} to update an attribute of a + * {@code Container} such as its Resource allocation or (@code ExecutionType} + *

    + * It includes: + *

      + *
    • version for the container.
    • + *
    • {@link ContainerId} for the container.
    • + *
    • + * {@link Resource} capability of the container after the update request + * is completed. + *
    • + *
    • + * {@link ExecutionType} of the container after the update request is + * completed. + *
    • + *
    + * + * Update rules: + *
      + *
    • + * Currently only ONE aspect of the container can be updated per request + * (user can either update Capability OR ExecutionType in one request.. + * not both). + *
    • + *
    • + * There must be only 1 update request per container in an allocate call. + *
    • + *
    • + * If a new update request is sent for a container (in a subsequent allocate + * call) before the first one is satisfied by the Scheduler, it will + * overwrite the previous request. + *
    • + *
    + * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class UpdateContainerRequest { + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdateContainerRequest newInstance(int version, + ContainerId containerId, ContainerUpdateType updateType, + Resource targetCapability, ExecutionType targetExecutionType) { + UpdateContainerRequest request = + Records.newRecord(UpdateContainerRequest.class); + request.setContainerVersion(version); + request.setContainerId(containerId); + request.setContainerUpdateType(updateType); + request.setExecutionType(targetExecutionType); + request.setCapability(targetCapability); + return request; + } + + /** + * Get the ContainerId of the container. + * @return ContainerId of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract int getContainerVersion(); + + /** + * Set the current version of the container. + * @param containerVersion of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerVersion(int containerVersion); + + /** + * Get the ContainerUpdateType of the container. + * @return ContainerUpdateType of the container. + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract ContainerUpdateType getContainerUpdateType(); + + /** + * Set the ContainerUpdateType of the container. + * @param updateType of the Container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerUpdateType(ContainerUpdateType updateType); + + /** + * Get the ContainerId of the container. + * @return ContainerId of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract ContainerId getContainerId(); + + /** + * Set the ContainerId of the container. + * @param containerId ContainerId of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the Resource capability of the container. + * @return Resource capability of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract Resource getCapability(); + + /** + * Set the Resource capability of the container. + * @param capability Resource capability of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setCapability(Resource capability); + + /** + * Get the target ExecutionType of the container. + * @return ExecutionType of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract ExecutionType getExecutionType(); + + /** + * Set the target ExecutionType of the container. + * @param executionType ExecutionType of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setExecutionType(ExecutionType executionType); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + ContainerId cId = getContainerId(); + ExecutionType execType = getExecutionType(); + Resource capability = getCapability(); + result = + prime * result + ((capability == null) ? 0 : capability.hashCode()); + result = prime * result + ((cId == null) ? 0 : cId.hashCode()); + result = prime * result + getContainerVersion(); + result = prime * result + ((execType == null) ? 0 : execType.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpdateContainerRequest other = (UpdateContainerRequest) obj; + Resource capability = getCapability(); + if (capability == null) { + if (other.getCapability() != null) { + return false; + } + } else if (!capability.equals(other.getCapability())) { + return false; + } + ContainerId cId = getContainerId(); + if (cId == null) { + if (other.getContainerId() != null) { + return false; + } + } else if (!cId.equals(other.getContainerId())) { + return false; + } + if (getContainerVersion() != other.getContainerVersion()) { + return false; + } + ExecutionType execType = getExecutionType(); + if (execType == null) { + if (other.getExecutionType() != null) { + return false; + } + } else if (!execType.equals(other.getExecutionType())) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java new file mode 100644 index 0000000..68f6ca1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.Records; + +/** + * An object that encapsulates an updated container and the + * type of Update. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class UpdatedContainer { + + /** + * Static Factory method. + * + * @param updateType ContainerUpdateType + * @param container Container + * @return UpdatedContainer + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdatedContainer newInstance(ContainerUpdateType updateType, + Container container) { + UpdatedContainer updatedContainer = + Records.newRecord(UpdatedContainer.class); + updatedContainer.setUpdateType(updateType); + updatedContainer.setContainer(container); + return updatedContainer; + } + + /** + * Get the ContainerUpdateType. + * @return ContainerUpdateType + */ + public abstract ContainerUpdateType getUpdateType(); + + /** + * Set the ContainerUpdateType. + * @param updateType ContainerUpdateType + */ + public abstract void setUpdateType(ContainerUpdateType updateType); + + /** + * Get the Container. + * @return Container + */ + public abstract Container getContainer(); + + /** + * Set the Container. + * @param container Container + */ + public abstract void setContainer(Container container); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + ContainerUpdateType updateType = getUpdateType(); + Container container = getContainer(); + result = prime * result + ((updateType == null) ? 0 : + updateType.hashCode()); + result = prime * result + ((container == null) ? 0 : container.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpdatedContainer other = (UpdatedContainer) obj; + ContainerUpdateType updateType = getUpdateType(); + if (updateType == null) { + if (other.getUpdateType() != null) { + return false; + } + } else if (updateType != other.getUpdateType()) { + return false; + } + Container container = getContainer(); + if (container == null) { + if (other.getContainer() != null) { + return false; + } + } else if (!container.equals(other.getContainer())) { + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 2cc1784..2d6007e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -94,6 +94,7 @@ message ContainerProto { optional hadoop.common.TokenProto container_token = 6; optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED]; optional int64 allocation_request_id = 8 [default = -1]; + optional int32 version = 9 [default = 0]; } message ContainerReportProto { @@ -535,11 +536,6 @@ enum ContainerExitStatusProto { DISKS_FAILED = -101; } -message ContainerResourceChangeRequestProto { - optional ContainerIdProto container_id = 1; - optional ResourceProto capability = 2; -} - message ContainerRetryContextProto { optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; repeated int32 error_codes = 2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 4abb80b..97eaa5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -60,14 +60,32 @@ message FinishApplicationMasterResponseProto { optional bool isUnregistered = 1 [default = false]; } +enum ContainerUpdateTypeProto { + INCREASE_RESOURCE = 0; + DECREASE_RESOURCE = 1; + UPDATE_EXECUTION_TYPE = 2; +} + +message UpdateContainerRequestProto { + required int32 container_version = 1; + required ContainerIdProto container_id = 2; + required ContainerUpdateTypeProto update_type = 3; + optional ResourceProto capability = 4; + optional ExecutionTypeProto execution_type = 5; +} + +message UpdateContainerErrorProto { + optional string reason = 1; + optional UpdateContainerRequestProto update_request = 2; +} + message AllocateRequestProto { repeated ResourceRequestProto ask = 1; repeated ContainerIdProto release = 2; optional ResourceBlacklistRequestProto blacklist_request = 3; optional int32 response_id = 4; optional float progress = 5; - repeated ContainerResourceChangeRequestProto increase_request = 6; - repeated ContainerResourceChangeRequestProto decrease_request = 7; + repeated UpdateContainerRequestProto update_requests = 6; } message NMTokenProto { @@ -75,6 +93,11 @@ message NMTokenProto { optional hadoop.common.TokenProto token = 2; } +message UpdatedContainerProto { + required ContainerUpdateTypeProto update_type = 1; + required ContainerProto container = 2; +} + message AllocateResponseProto { optional AMCommandProto a_m_command = 1; optional int32 response_id = 2; @@ -85,11 +108,11 @@ message AllocateResponseProto { optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; repeated NMTokenProto nm_tokens = 9; - repeated ContainerProto increased_containers = 10; - repeated ContainerProto decreased_containers = 11; + repeated UpdatedContainerProto updated_containers = 10; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; optional string collector_addr = 14; + repeated UpdateContainerErrorProto update_errors = 15; } enum SchedulerResourceTypes { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b9949e1..17dae6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; @@ -912,7 +913,8 @@ public class ApplicationMaster { } @Override - public void onContainersResourceChanged(List containers) {} + public void onContainersUpdated( + List containers) {} @Override public void onShutdownRequest() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 10d2a2f..d2195a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; @@ -63,7 +65,7 @@ import com.google.common.annotations.VisibleForTesting; * [run tasks on the containers] * } * - * public void onContainersResourceChanged(List containers) { + * public void onContainersUpdated(List containers) { * [determine if resource allocation of containers have been increased in * the ResourceManager, and if so, inform the NodeManagers to increase the * resource monitor/enforcement on the containers] @@ -426,8 +428,9 @@ extends AbstractService { * Called when the ResourceManager responds to a heartbeat with containers * whose resource allocation has been changed. */ - public abstract void onContainersResourceChanged( - List containers); + @Public + @Unstable + public abstract void onContainersUpdated(List containers); /** * Called when the ResourceManager wants the ApplicationMaster to shutdown http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 242df65..bc6cadd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; @@ -354,12 +355,11 @@ extends AMRMClientAsync { if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) { // RM side of the implementation guarantees that there are // no duplications between increased and decreased containers - List changed = new ArrayList<>(); - changed.addAll(response.getIncreasedContainers()); - changed.addAll(response.getDecreasedContainers()); + List changed = new ArrayList<>(); + changed.addAll(response.getUpdatedContainers()); if (!changed.isEmpty()) { ((AMRMClientAsync.AbstractCallbackHandler) handler) - .onContainersResourceChanged(changed); + .onContainersUpdated(changed); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 60834f6..6f6bb85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -261,36 +263,10 @@ public class AMRMClientImpl extends AMRMClient { new HashMap<>(); try { synchronized (this) { - askList = new ArrayList(ask.size()); - for(ResourceRequest r : ask) { - // create a copy of ResourceRequest as we might change it while the - // RPC layer is using it to send info across - ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(), - r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality(), r.getNodeLabelExpression(), - r.getExecutionTypeRequest()); - rr.setAllocationRequestId(r.getAllocationRequestId()); - askList.add(rr); - } - List increaseList = new ArrayList<>(); - List decreaseList = new ArrayList<>(); + askList = cloneAsks(); // Save the current change for recovery oldChange.putAll(change); - for (Map.Entry> entry : - change.entrySet()) { - Container container = entry.getValue().getKey(); - Resource original = container.getResource(); - Resource target = entry.getValue().getValue(); - if (Resources.fitsIn(target, original)) { - // This is a decrease request - decreaseList.add(ContainerResourceChangeRequest.newInstance( - container.getId(), target)); - } else { - // This is an increase request - increaseList.add(ContainerResourceChangeRequest.newInstance( - container.getId(), target)); - } - } + List updateList = createUpdateList(); releaseList = new ArrayList(release); // optimistically clear this collection assuming no RPC failure ask.clear(); @@ -306,8 +282,7 @@ public class AMRMClientImpl extends AMRMClient { allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest, - increaseList, decreaseList); + askList, releaseList, blacklistRequest, updateList); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -358,9 +333,8 @@ public class AMRMClientImpl extends AMRMClient { if (!pendingChange.isEmpty()) { List completed = allocateResponse.getCompletedContainersStatuses(); - List changed = new ArrayList<>(); - changed.addAll(allocateResponse.getIncreasedContainers()); - changed.addAll(allocateResponse.getDecreasedContainers()); + List changed = new ArrayList<>(); + changed.addAll(allocateResponse.getUpdatedContainers()); // remove all pending change requests that belong to the completed // containers for (ContainerStatus status : completed) { @@ -417,6 +391,40 @@ public class AMRMClientImpl extends AMRMClient { return allocateResponse; } + private List createUpdateList() { + List updateList = new ArrayList<>(); + for (Map.Entry> entry : + change.entrySet()) { + Resource targetCapability = entry.getValue().getValue(); + Resource currCapability = entry.getValue().getKey().getResource(); + int version = entry.getValue().getKey().getVersion(); + ContainerUpdateType updateType = + ContainerUpdateType.INCREASE_RESOURCE; + if (Resources.fitsIn(targetCapability, currCapability)) { + updateType = ContainerUpdateType.DECREASE_RESOURCE; + } + updateList.add( + UpdateContainerRequest.newInstance(version, entry.getKey(), + updateType, targetCapability, null)); + } + return updateList; + } + + private List cloneAsks() { + List askList = new ArrayList(ask.size()); + for(ResourceRequest r : ask) { + // create a copy of ResourceRequest as we might change it while the + // RPC layer is using it to send info across + ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(), + r.getResourceName(), r.getCapability(), r.getNumContainers(), + r.getRelaxLocality(), r.getNodeLabelExpression(), + r.getExecutionTypeRequest()); + rr.setAllocationRequestId(r.getAllocationRequestId()); + askList.add(rr); + } + return askList; + } + protected void removePendingReleaseRequests( List completedContainersStatuses) { for (ContainerStatus containerStatus : completedContainersStatuses) { @@ -425,16 +433,16 @@ public class AMRMClientImpl extends AMRMClient { } protected void removePendingChangeRequests( - List changedContainers) { - for (Container changedContainer : changedContainers) { - ContainerId containerId = changedContainer.getId(); + List changedContainers) { + for (UpdatedContainer changedContainer : changedContainers) { + ContainerId containerId = changedContainer.getContainer().getId(); if (pendingChange.get(containerId) == null) { continue; } if (LOG.isDebugEnabled()) { LOG.debug("RM has confirmed changed resource allocation for " + "container " + containerId + ". Current resource allocation:" - + changedContainer.getResource() + + changedContainer.getContainer().getResource() + ". Remove pending change request:" + pendingChange.get(containerId).getValue()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index c7b3a94..dac82e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -45,9 +45,11 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -89,20 +91,21 @@ public class TestAMRMClientAsync { TestCallbackHandler callbackHandler = new TestCallbackHandler(); final AMRMClient client = mock(AMRMClientImpl.class); final AtomicInteger secondHeartbeatSync = new AtomicInteger(0); - when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer() { - @Override - public AllocateResponse answer(InvocationOnMock invocation) - throws Throwable { - secondHeartbeatSync.incrementAndGet(); - while (heartbeatBlock.get()) { - synchronized (heartbeatBlock) { - heartbeatBlock.wait(); + when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer( + new Answer() { + @Override + public AllocateResponse answer(InvocationOnMock invocation) + throws Throwable { + secondHeartbeatSync.incrementAndGet(); + while (heartbeatBlock.get()) { + synchronized (heartbeatBlock) { + heartbeatBlock.wait(); + } + } + secondHeartbeatSync.incrementAndGet(); + return response2; } - } - secondHeartbeatSync.incrementAndGet(); - return response2; - } - }).thenReturn(response3).thenReturn(emptyResponse); + }).thenReturn(response3).thenReturn(emptyResponse); when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) .thenReturn(null); when(client.getAvailableResources()).thenAnswer(new Answer() { @@ -410,10 +413,21 @@ public class TestAMRMClientAsync { List completed, List allocated, List increased, List decreased, List nmTokens) { + List updatedContainers = new ArrayList<>(); + for (Container c : increased) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.INCREASE_RESOURCE, c)); + } + for (Container c : decreased) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.DECREASE_RESOURCE, c)); + } AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, new ArrayList(), null, null, 1, null, nmTokens, - increased, decreased); + updatedContainers); return response; } @@ -429,7 +443,7 @@ public class TestAMRMClientAsync { extends AMRMClientAsync.AbstractCallbackHandler { private volatile List completedContainers; private volatile List allocatedContainers; - private final List changedContainers = new ArrayList<>(); + private final List changedContainers = new ArrayList<>(); Exception savedException = null; volatile boolean reboot = false; Object notifier = new Object(); @@ -448,8 +462,8 @@ public class TestAMRMClientAsync { return ret; } - public List takeChangedContainers() { - List ret = null; + public List takeChangedContainers() { + List ret = null; synchronized (changedContainers) { if (!changedContainers.isEmpty()) { ret = new ArrayList<>(changedContainers); @@ -488,8 +502,8 @@ public class TestAMRMClientAsync { } @Override - public void onContainersResourceChanged( - List changed) { + public void onContainersUpdated( + List changed) { synchronized (changedContainers) { changedContainers.clear(); changedContainers.addAll(changed); @@ -564,7 +578,8 @@ public class TestAMRMClientAsync { public void onContainersAllocated(List containers) {} @Override - public void onContainersResourceChanged(List containers) {} + public void onContainersUpdated( + List containers) {} @Override public void onShutdownRequest() {} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index e0ad2c4..38178a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -1061,33 +1062,20 @@ public class TestAMRMClient { Assert.assertEquals(2, amClientImpl.pendingChange.size()); // as of now: container1 asks to decrease to (512, 1) // container2 asks to increase to (2048, 1) - List decreasedContainers; - List increasedContainers; - int allocateAttempts = 0; - int decreased = 0; - int increased = 0; - while (allocateAttempts < 30) { - // send allocation requests - AllocateResponse allocResponse = amClient.allocate(0.1f); - decreasedContainers = allocResponse.getDecreasedContainers(); - increasedContainers = allocResponse.getIncreasedContainers(); - decreased += decreasedContainers.size(); - increased += increasedContainers.size(); - if (allocateAttempts == 0) { - // we should get decrease confirmation right away - Assert.assertEquals(1, decreased); - // After first allocate request check change size - Assert.assertEquals(0, amClientImpl.change.size()); - } else if (increased == 1) { - break; - } - // increase request is served after next NM heart beat is received - // Sleeping and retrying allocate - sleep(20); - allocateAttempts++; - } - Assert.assertEquals(1, decreased); - Assert.assertEquals(1, increased); + // send allocation requests + AllocateResponse allocResponse = amClient.allocate(0.1f); + Assert.assertEquals(0, amClientImpl.change.size()); + // we should get decrease confirmation right away + List updatedContainers = + allocResponse.getUpdatedContainers(); + Assert.assertEquals(1, updatedContainers.size()); + // we should get increase allocation after the next NM's heartbeat to RM + sleep(150); + // get allocations + allocResponse = amClient.allocate(0.1f); + updatedContainers = + allocResponse.getUpdatedContainers(); + Assert.assertEquals(1, updatedContainers.size()); } private void testAllocation(final AMRMClientImpl amClient) http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 719d9a1..f1c49f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -39,12 +39,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -262,7 +262,7 @@ public class TestAMRMClientOnRMRestart { // new NM to represent NM re-register nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, ContainerState.RUNNING, + NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING, Resource.newInstance(1024, 1), "recover container", 0, Priority.newInstance(0), 0); nm1.registerNode(Collections.singletonList(containerReport), @@ -399,7 +399,7 @@ public class TestAMRMClientOnRMRestart { ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, ContainerState.RUNNING, + NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING, Resource.newInstance(1024, 1), "recover container", 0, Priority.newInstance(0), 0); nm1.registerNode(Arrays.asList(containerReport), null); @@ -562,8 +562,8 @@ public class TestAMRMClientOnRMRestart { List lastAsk = null; List lastRelease = null; - List lastIncrease = null; - List lastDecrease = null; + List lastIncrease = null; + List lastDecrease = null; List lastBlacklistAdditions; List lastBlacklistRemovals; @@ -574,8 +574,8 @@ public class TestAMRMClientOnRMRestart { ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals, - List increaseRequests, - List decreaseRequests) { + List increaseRequests, + List decreaseRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org