Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 018DF11CEF for ; Tue, 15 Jul 2014 21:49:23 +0000 (UTC) Received: (qmail 66591 invoked by uid 500); 15 Jul 2014 21:49:22 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 66549 invoked by uid 500); 15 Jul 2014 21:49:22 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 66538 invoked by uid 99); 15 Jul 2014 21:49:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Jul 2014 21:49:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Jul 2014 21:49:19 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5B01923889E1; Tue, 15 Jul 2014 21:48:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1610860 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn... Date: Tue, 15 Jul 2014 21:48:58 -0000 To: yarn-commits@hadoop.apache.org From: mayank@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140715214859.5B01923889E1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mayank Date: Tue Jul 15 21:48:58 2014 New Revision: 1610860 URL: http://svn.apache.org/r1610860 Log: YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and caused a task timeout for 30mins. (Sunil G via mayank) Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jul 15 21:48:58 2014 @@ -259,6 +259,9 @@ Release 2.5.0 - UNRELEASED YARN-2241. ZKRMStateStore: On startup, show nicer messages if znodes already exist. (Robert Kanter via kasha) + YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and + caused a task timeout for 30mins. (Sunil G via mayank) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Tue Jul 15 21:48:58 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; /** @@ -73,5 +76,7 @@ public interface RMContainer extends Eve ContainerReport createContainerReport(); boolean isAMContainer(); + + List getResourceRequests(); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Tue Jul 15 21:48:58 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.EnumSet; +import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -158,6 +160,7 @@ public class RMContainerImpl implements private long finishTime; private ContainerStatus finishedStatus; private boolean isAMContainer; + private List resourceRequests; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -180,7 +183,8 @@ public class RMContainerImpl implements this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); this.isAMContainer = false; - + this.resourceRequests = null; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -311,6 +315,25 @@ public class RMContainerImpl implements readLock.unlock(); } } + + @Override + public List getResourceRequests() { + try { + readLock.lock(); + return resourceRequests; + } finally { + readLock.unlock(); + } + } + + public void setResourceRequests(List requests) { + try { + writeLock.lock(); + this.resourceRequests = requests; + } finally { + writeLock.unlock(); + } + } @Override public String toString() { @@ -432,6 +455,9 @@ public class RMContainerImpl implements @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Clear ResourceRequest stored in RMContainer + container.setResourceRequests(null); + // Register with containerAllocationExpirer. container.containerAllocationExpirer.register(container.getContainerId()); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Tue Jul 15 21:48:58 2014 @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -275,6 +276,27 @@ public abstract class AbstractYarnSchedu return rmContainer; } + /** + * Recover resource request back from RMContainer when a container is + * preempted before AM pulled the same. If container is pulled by + * AM, then RMContainer will not have resource request to recover. + * @param rmContainer + */ + protected void recoverResourceRequestForContainer(RMContainer rmContainer) { + List requests = rmContainer.getResourceRequests(); + + // If container state is moved to ACQUIRED, request will be empty. + if (requests == null) { + return; + } + // Add resource request back to Scheduler. + SchedulerApplicationAttempt schedulerAttempt + = getCurrentAttemptForContainer(rmContainer.getContainerId()); + if (schedulerAttempt != null) { + schedulerAttempt.recoverResourceRequests(requests); + } + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Tue Jul 15 21:48:58 2014 @@ -127,9 +127,10 @@ public class AppSchedulingInfo { * by the application. * * @param requests resources to be acquired + * @param recoverPreemptedRequest recover Resource Request on preemption */ synchronized public void updateResourceRequests( - List requests) { + List requests, boolean recoverPreemptedRequest) { QueueMetrics metrics = queue.getMetrics(); // Update resource requests @@ -163,8 +164,13 @@ public class AppSchedulingInfo { asks = new HashMap(); this.requests.put(priority, asks); this.priorities.add(priority); - } else if (updatePendingResources) { - lastRequest = asks.get(resourceName); + } + lastRequest = asks.get(resourceName); + + if (recoverPreemptedRequest && lastRequest != null) { + // Increment the number of containers to 1, as it is recovering a + // single container. + request.setNumContainers(lastRequest.getNumContainers() + 1); } asks.put(resourceName, request); @@ -254,14 +260,16 @@ public class AppSchedulingInfo { * @param container * the containers allocated. */ - synchronized public void allocate(NodeType type, SchedulerNode node, - Priority priority, ResourceRequest request, Container container) { + synchronized public List allocate(NodeType type, + SchedulerNode node, Priority priority, ResourceRequest request, + Container container) { + List resourceRequests = new ArrayList(); if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, priority, request, container); + allocateNodeLocal(node, priority, request, container, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, priority, request, container); + allocateRackLocal(node, priority, request, container, resourceRequests); } else { - allocateOffSwitch(node, priority, request, container); + allocateOffSwitch(node, priority, request, container, resourceRequests); } QueueMetrics metrics = queue.getMetrics(); if (pending) { @@ -279,6 +287,7 @@ public class AppSchedulingInfo { + " resource=" + request.getCapability()); } metrics.allocateResources(user, 1, request.getCapability(), true); + return resourceRequests; } /** @@ -288,9 +297,9 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateNodeLocal( - SchedulerNode node, Priority priority, - ResourceRequest nodeLocalRequest, Container container) { + synchronized private void allocateNodeLocal(SchedulerNode node, + Priority priority, ResourceRequest nodeLocalRequest, Container container, + List resourceRequests) { // Update future requirements nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1); if (nodeLocalRequest.getNumContainers() == 0) { @@ -304,7 +313,14 @@ public class AppSchedulingInfo { this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned NodeLocal, RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -314,16 +330,22 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateRackLocal( - SchedulerNode node, Priority priority, - ResourceRequest rackLocalRequest, Container container) { + synchronized private void allocateRackLocal(SchedulerNode node, + Priority priority, ResourceRequest rackLocalRequest, Container container, + List resourceRequests) { // Update future requirements rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1); if (rackLocalRequest.getNumContainers() == 0) { this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -333,11 +355,13 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateOffSwitch( - SchedulerNode node, Priority priority, - ResourceRequest offSwitchRequest, Container container) { + synchronized private void allocateOffSwitch(SchedulerNode node, + Priority priority, ResourceRequest offSwitchRequest, Container container, + List resourceRequests) { // Update future requirements decrementOutstanding(offSwitchRequest); + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } synchronized private void decrementOutstanding( @@ -436,4 +460,11 @@ public class AppSchedulingInfo { metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), false); } + + public ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = ResourceRequest.newInstance( + request.getPriority(), request.getResourceName(), + request.getCapability(), 1, request.getRelaxLocality()); + return newRequest; + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Jul 15 21:48:58 2014 @@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt public synchronized void updateResourceRequests( List requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests); + appSchedulingInfo.updateResourceRequests(requests, false); + } + } + + public synchronized void recoverResourceRequests( + List requests) { + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests, true); } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Jul 15 21:48:58 2014 @@ -1089,6 +1089,7 @@ public class CapacityScheduler extends if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } + recoverResourceRequestForContainer(cont); completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Tue Jul 15 21:48:58 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc if (null == liveContainers.remove(rmContainer.getContainerId())) { return false; } + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + List resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList); // Inform the container rmContainer.handle( Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Tue Jul 15 21:48:58 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; @@ -82,6 +83,9 @@ public class FSSchedulerApp extends Sche Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); + // Inform the container rmContainer.handle( new RMContainerFinishedEvent( @@ -281,9 +285,13 @@ public class FSSchedulerApp extends Sche liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + List resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + // Inform the container rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.START)); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Jul 15 21:48:58 2014 @@ -422,7 +422,7 @@ public class FairScheduler extends } } - private void warnOrKillContainer(RMContainer container) { + protected void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSSchedulerApp app = getSchedulerApp(appAttemptId); FSLeafQueue queue = app.getQueue(); @@ -440,6 +440,7 @@ public class FairScheduler extends SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Tue Jul 15 21:48:58 2014 @@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -204,4 +214,36 @@ public class TestRMContainerImpl { assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); } + + @Test + public void testExistenceOfResourceRequestInRMContainer() throws Exception { + Configuration conf = new Configuration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + ResourceScheduler scheduler = rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // Verify whether list of ResourceRequest is present in RMContainer + // while moving to ALLOCATED state + Assert.assertNotNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + + // Allocate container + am1.allocate(new ArrayList(), new ArrayList()) + .getAllocatedContainers(); + rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED); + + // After RMContainer moving to ACQUIRED state, list of ResourceRequest will + // be empty + Assert.assertNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Tue Jul 15 21:48:58 2014 @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -947,4 +951,67 @@ public class TestCapacityScheduler { rm1.stop(); } + + @Test(timeout = 30000) + public void testRecoverRequestAfterPreemption() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId1 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED); + + RMContainer rmContainer = cs.getRMContainer(containerId1); + List requests = rmContainer.getResourceRequests(); + FiCaSchedulerApp app = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + + FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode()); + for (ResourceRequest request : requests) { + // Skip the OffRack and RackLocal resource requests. + if (request.getResourceName().equals(node.getRackName()) + || request.getResourceName().equals(ResourceRequest.ANY)) { + continue; + } + + // Already the node local resource request is cleared from RM after + // allocation. + Assert.assertNull(app.getResourceRequest(request.getPriority(), + request.getResourceName())); + } + + // Call killContainer to preempt the container + cs.killContainer(rmContainer); + + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + // Resource request must have added back in RM after preempt event + // handling. + Assert.assertEquals( + 1, + app.getResourceRequest(request.getPriority(), + request.getResourceName()).getNumContainers()); + } + + // New container will be allocated and will move to ALLOCATED state + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // allocate container + List containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Tue Jul 15 21:48:58 2014 @@ -167,6 +167,27 @@ public class FairSchedulerTestBase { .put(id.getApplicationId(), rmApp); return id; } + + protected ApplicationAttemptId createSchedulingRequest(String queueId, + String userId, List ask) { + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, + this.ATTEMPT_ID++); + scheduler.addApplication(id.getApplicationId(), queueId, userId); + // This conditional is for testAclSubmitApplication where app is rejected + // and no app is added. + if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { + scheduler.addApplicationAttempt(id, false, true); + } + scheduler.allocate(id, ask, new ArrayList(), null, null); + RMApp rmApp = mock(RMApp.class); + RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); + when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); + when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( + new RMAppAttemptMetrics(id)); + resourceManager.getRMContext().getRMApps() + .put(id.getApplicationId(), rmApp); + return id; + } protected void createSchedulingRequestExistingApplication( int memory, int priority, ApplicationAttemptId attId) { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1610860&r1=1610859&r2=1610860&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Jul 15 21:48:58 2014 @@ -53,10 +53,13 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; @@ -77,11 +80,13 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -2831,6 +2836,87 @@ public class TestFairScheduler extends F } } } + + @Test(timeout=5000) + public void testRecoverRequestAfterPreemption() throws Exception { + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + Priority priority = Priority.newInstance(20); + String host = "127.0.0.1"; + int GB = 1024; + + // Create Node and raised Node Added event + RMNode node = MockNodes.newNodeInfo(1, + Resources.createResource(16 * 1024, 4), 0, host); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + // Create 3 container requests and place it in ask + List ask = new ArrayList(); + ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host, + priority.getPriority(), 1, true); + ResourceRequest rackLocalRequest = createResourceRequest(GB, 1, + node.getRackName(), priority.getPriority(), 1, true); + ResourceRequest offRackRequest = createResourceRequest(GB, 1, + ResourceRequest.ANY, priority.getPriority(), 1, true); + ask.add(nodeLocalRequest); + ask.add(rackLocalRequest); + ask.add(offRackRequest); + + // Create Request and update + ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA", + "user1", ask); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeUpdate); + + assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() + .size()); + FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + + // ResourceRequest will be empty once NodeUpdate is completed + Assert.assertNull(app.getResourceRequest(priority, host)); + + ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1); + RMContainer rmContainer = app.getRMContainer(containerId1); + + // Create a preempt event and register for preemption + scheduler.warnOrKillContainer(rmContainer); + + // Wait for few clock ticks + clock.tick(5); + + // preempt now + scheduler.warnOrKillContainer(rmContainer); + + List requests = rmContainer.getResourceRequests(); + // Once recovered, resource request will be present again in app + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + Assert.assertEquals(1, + app.getResourceRequest(priority, request.getResourceName()) + .getNumContainers()); + } + + // Send node heartbeat + scheduler.update(); + scheduler.handle(nodeUpdate); + + List containers = scheduler.allocate(appAttemptId, + Collections. emptyList(), + Collections. emptyList(), null, null).getContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } @SuppressWarnings("resource") @Test