Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 29AC3200BB7 for ; Wed, 9 Nov 2016 09:13:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 28434160AFD; Wed, 9 Nov 2016 08:13:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ECA0C160AEE for ; Wed, 9 Nov 2016 09:13:18 +0100 (CET) Received: (qmail 80594 invoked by uid 500); 9 Nov 2016 08:13:18 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 80582 invoked by uid 99); 9 Nov 2016 08:13:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Nov 2016 08:13:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D7804E07EF; Wed, 9 Nov 2016 08:13:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-5823. Update NMTokens in case of requests with only opportunistic containers. (Konstantinos Karanasos via asuresh) Date: Wed, 9 Nov 2016 08:13:17 +0000 (UTC) archived-at: Wed, 09 Nov 2016 08:13:20 -0000 Repository: hadoop Updated Branches: refs/heads/trunk ed0bebaba -> 283fa33fe YARN-5823. Update NMTokens in case of requests with only opportunistic containers. (Konstantinos Karanasos via asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/283fa33f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/283fa33f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/283fa33f Branch: refs/heads/trunk Commit: 283fa33febe043bd7b4fa87546be26c9c5a8f8b5 Parents: ed0beba Author: Arun Suresh Authored: Wed Nov 9 00:11:25 2016 -0800 Committer: Arun Suresh Committed: Wed Nov 9 00:11:25 2016 -0800 ---------------------------------------------------------------------- .../TestOpportunisticContainerAllocation.java | 71 +++++++++++++++++++- .../OpportunisticContainerAllocator.java | 55 ++++++++------- .../containermanager/ContainerManagerImpl.java | 2 +- .../scheduler/DistributedScheduler.java | 19 ++++-- .../ApplicationMasterService.java | 3 +- ...pportunisticContainerAllocatorAMService.java | 23 ++++++- 6 files changed, 137 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/283fa33f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index b9b4b02..ace145d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -229,6 +229,9 @@ public class TestOpportunisticContainerAllocation { amClient.registerApplicationMaster("Host", 10000, ""); + testOpportunisticAllocation( + (AMRMClientImpl) amClient); + testAllocation((AMRMClientImpl)amClient); amClient @@ -247,7 +250,6 @@ public class TestOpportunisticContainerAllocation { final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request - assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); @@ -388,6 +390,73 @@ public class TestOpportunisticContainerAllocation { assertEquals(0, amClient.release.size()); } + /** + * Tests allocation with requests comprising only opportunistic containers. + */ + private void testOpportunisticAllocation( + final AMRMClientImpl amClient) + throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + int oppContainersRequestedAny = + amClient.getTable(0).get(priority, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); + + assertEquals(2, oppContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 10; + Set releases = new TreeSet<>(); + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap receivedNMTokens = new HashMap<>(); + + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + for (Container container : allocResponse.getAllocatedContainers()) { + allocatedContainerCount++; + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(1, receivedNMTokens.values().size()); + } + private void sleep(int sleepTime) { try { Thread.sleep(sleepTime); http://git-wip-us.apache.org/repos/asf/hadoop/blob/283fa33f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 4410db1..16436bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -157,12 +156,18 @@ public class OpportunisticContainerAllocator { } } - static class PartitionedResourceRequests { + /** + * Class that includes two lists of {@link ResourceRequest}s: one for + * GUARANTEED and one for OPPORTUNISTIC {@link ResourceRequest}s. + */ + public static class PartitionedResourceRequests { private List guaranteed = new ArrayList<>(); private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { return guaranteed; } + public List getOpportunistic() { return opportunistic; } @@ -186,10 +191,10 @@ public class OpportunisticContainerAllocator { } /** - * Entry point into the Opportunistic Container Allocator. + * Allocate OPPORTUNISTIC containers. * @param request AllocateRequest * @param applicationAttemptId ApplicationAttemptId - * @param appContext App Specific OpportunisticContainerContext + * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier * @param appSubmitter App Submitter * @return List of Containers. @@ -197,37 +202,31 @@ public class OpportunisticContainerAllocator { */ public List allocateContainers( AllocateRequest request, ApplicationAttemptId applicationAttemptId, - OpportunisticContainerContext appContext, long rmIdentifier, + OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAskList()); - - if (partitionedAsks.getOpportunistic().isEmpty()) { - return Collections.emptyList(); - } - + // Update released containers. List releasedContainers = request.getReleaseList(); int numReleasedContainers = releasedContainers.size(); if (numReleasedContainers > 0) { LOG.info("AttemptID: " + applicationAttemptId + " released: " + numReleasedContainers); - appContext.getContainersAllocated().removeAll(releasedContainers); + opportContext.getContainersAllocated().removeAll(releasedContainers); } - // Also, update black list + // Update black list. ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); if (rbr != null) { - appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); } - // Add OPPORTUNISTIC reqs to the outstanding reqs - appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(request.getAskList()); + // Satisfy the outstanding OPPORTUNISTIC requests. List allocatedContainers = new ArrayList<>(); for (Priority priority : - appContext.getOutstandingOpReqs().descendingKeySet()) { + opportContext.getOutstandingOpReqs().descendingKeySet()) { // Allocated containers : // Key = Requested Capability, // Value = List of Containers of given cap (the actual container size @@ -235,16 +234,14 @@ public class OpportunisticContainerAllocator { // we need the requested capability (key) to match against // the outstanding reqs) Map> allocated = allocate(rmIdentifier, - appContext, priority, applicationAttemptId, appSubmitter); + opportContext, priority, applicationAttemptId, appSubmitter); for (Map.Entry> e : allocated.entrySet()) { - appContext.matchAllocationToOutstandingRequest( + opportContext.matchAllocationToOutstandingRequest( e.getKey(), e.getValue()); allocatedContainers.addAll(e.getValue()); } } - // Send all the GUARANTEED Reqs to RM - request.setAskList(partitionedAsks.getGuaranteed()); return allocatedContainers; } @@ -359,8 +356,14 @@ public class OpportunisticContainerAllocator { return containerToken; } - private PartitionedResourceRequests partitionAskList(List - askList) { + /** + * Partitions a list of ResourceRequest to two separate lists, one for + * GUARANTEED and one for OPPORTUNISTIC ResourceRequests. + * @param askList the list of ResourceRequests to be partitioned + * @return the partitioned ResourceRequests + */ + public PartitionedResourceRequests partitionAskList( + List askList) { PartitionedResourceRequests partitionedRequests = new PartitionedResourceRequests(); for (ResourceRequest rr : askList) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/283fa33f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 76933ec..ab5827e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1523,7 +1523,7 @@ public class ContainerManagerImpl extends CompositeService implements @Override public OpportunisticContainersStatus getOpportunisticContainersStatus() { - return null; + return OpportunisticContainersStatus.newInstance(); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/283fa33f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index a12d16a..0f47c93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -220,16 +220,27 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( DistributedSchedulingAllocateRequest request) throws YarnException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); - } + + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = containerAllocator + .partitionAskList(request.getAllocateRequest().getAskList()); + + // Allocate OPPORTUNISTIC containers. + request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List allocatedContainers = containerAllocator.allocateContainers( request.getAllocateRequest(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); + // Prepare request for sending to RM for scheduling GUARANTEED containers. request.setAllocatedContainers(allocatedContainers); + request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); http://git-wip-us.apache.org/repos/asf/hadoop/blob/283fa33f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4d73ba2..4f952b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -542,7 +542,8 @@ public class ApplicationMasterService extends AbstractService implements RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); - if (!allocation.getContainers().isEmpty()) { + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { allocateResponse.setNMTokens(allocation.getNMTokens()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/283fa33f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index bdd5718..50a9c4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -220,34 +220,51 @@ public class OpportunisticContainerAllocatorAMService public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = + oppContainerAllocator.partitionAskList(request.getAskList()); + + // Allocate OPPORTUNISTIC containers. + request.setAskList(partitionedAsks.getOpportunistic()); final ApplicationAttemptId appAttemptId = getAppAttemptId(); SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); + List oppContainers = oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); } - // Allocate all guaranteed containers + // Allocate GUARANTEED containers. + request.setAskList(partitionedAsks.getGuaranteed()); AllocateResponse allocateResp = super.allocate(request); + // Add allocated OPPORTUNISTIC containers to the AllocateResponse. + if (!oppContainers.isEmpty()) { + allocateResp.getAllocatedContainers().addAll(oppContainers); + } + + // Update opportunistic container context with the allocated GUARANTEED + // containers. oppCtx.updateCompletedContainers(allocateResp); // Add all opportunistic containers - allocateResp.getAllocatedContainers().addAll(oppContainers); return allocateResp; } @Override public RegisterDistributedSchedulingAMResponse - registerApplicationMasterForDistributedScheduling( + registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, IOException { RegisterApplicationMasterResponse response = --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org