Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8C17711B13 for ; Wed, 2 Jul 2014 01:55:16 +0000 (UTC) Received: (qmail 63995 invoked by uid 500); 2 Jul 2014 01:55:16 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 63957 invoked by uid 500); 2 Jul 2014 01:55:16 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 63946 invoked by uid 99); 2 Jul 2014 01:55:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jul 2014 01:55:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jul 2014 01:55:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B3BAF238897A; Wed, 2 Jul 2014 01:54:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1607227 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ hadoop-yarn/hadoop-yarn-server/hadoop... Date: Wed, 02 Jul 2014 01:54:48 -0000 To: yarn-commits@hadoop.apache.org From: mayank@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140702015448.B3BAF238897A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mayank Date: Wed Jul 2 01:54:47 2014 New Revision: 1607227 URL: http://svn.apache.org/r1607227 Log: YARN-2022 Preempting an Application Master container can be kept as least priority when multiple applications are marked for preemption by ProportionalCapacityPreemptionPolicy (Sunil G via mayank) Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jul 2 01:54:47 2014 @@ -55,6 +55,10 @@ Release 2.5.0 - UNRELEASED (Varun Vasudev via vinodkv) IMPROVEMENTS + + YARN-2022 Preempting an Application Master container can be kept as least priority + when multiple applications are marked for preemption by + ProportionalCapacityPreemptionPolicy (Sunil G via mayank) YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via jeagles) Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Wed Jul 2 01:54:47 2014 @@ -111,7 +111,7 @@ public class ProportionalCapacityPreempt public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - //the dispatcher to send preempt and kill events + // the dispatcher to send preempt and kill events public EventHandler dispatcher; private final Clock clock; @@ -437,8 +437,9 @@ public class ProportionalCapacityPreempt private Map> getContainersToPreempt( List queues, Resource clusterResource) { - Map> list = + Map> preemptMap = new HashMap>(); + List skippedAMContainerlist = new ArrayList(); for (TempQueue qT : queues) { // we act only if we are violating balance by more than @@ -449,26 +450,83 @@ public class ProportionalCapacityPreempt // accounts for natural termination of containers Resource resToObtain = Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + Resource skippedAMSize = Resource.newInstance(0, 0); // lock the leafqueue while we scan applications and unreserve - synchronized(qT.leafQueue) { - NavigableSet ns = - (NavigableSet) qT.leafQueue.getApplications(); + synchronized (qT.leafQueue) { + NavigableSet ns = + (NavigableSet) qT.leafQueue.getApplications(); Iterator desc = ns.descendingIterator(); qT.actuallyPreempted = Resources.clone(resToObtain); while (desc.hasNext()) { FiCaSchedulerApp fc = desc.next(); - if (Resources.lessThanOrEqual(rc, clusterResource, - resToObtain, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { break; } - list.put(fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain)); + preemptMap.put( + fc.getApplicationAttemptId(), + preemptFrom(fc, clusterResource, resToObtain, + skippedAMContainerlist, skippedAMSize)); } + Resource maxAMCapacityForThisQueue = Resources.multiply( + Resources.multiply(clusterResource, + qT.leafQueue.getAbsoluteCapacity()), + qT.leafQueue.getMaxAMResourcePerQueuePercent()); + + // Can try preempting AMContainers (still saving atmost + // maxAMCapacityForThisQueue AMResource's) if more resources are + // required to be preempted from this Queue. + preemptAMContainers(clusterResource, preemptMap, + skippedAMContainerlist, resToObtain, skippedAMSize, + maxAMCapacityForThisQueue); } } } - return list; + return preemptMap; + } + + /** + * As more resources are needed for preemption, saved AMContainers has to be + * rescanned. Such AMContainers can be preempted based on resToObtain, but + * maxAMCapacityForThisQueue resources will be still retained. + * + * @param clusterResource + * @param preemptMap + * @param skippedAMContainerlist + * @param resToObtain + * @param skippedAMSize + * @param maxAMCapacityForThisQueue + */ + private void preemptAMContainers(Resource clusterResource, + Map> preemptMap, + List skippedAMContainerlist, Resource resToObtain, + Resource skippedAMSize, Resource maxAMCapacityForThisQueue) { + for (RMContainer c : skippedAMContainerlist) { + // Got required amount of resources for preemption, can stop now + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, + // container selection iteration for preemption will be stopped. + if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, + maxAMCapacityForThisQueue)) { + break; + } + Set contToPrempt = preemptMap.get(c + .getApplicationAttemptId()); + if (null == contToPrempt) { + contToPrempt = new HashSet(); + preemptMap.put(c.getApplicationAttemptId(), contToPrempt); + } + contToPrempt.add(c); + + Resources.subtractFrom(resToObtain, c.getContainer().getResource()); + Resources.subtractFrom(skippedAMSize, c.getContainer() + .getResource()); + } + skippedAMContainerlist.clear(); } /** @@ -480,8 +538,9 @@ public class ProportionalCapacityPreempt * @param rsrcPreempt * @return */ - private Set preemptFrom( - FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) { + private Set preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Resource rsrcPreempt, + List skippedAMContainerlist, Resource skippedAMSize) { Set ret = new HashSet(); ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -513,6 +572,12 @@ public class ProportionalCapacityPreempt rsrcPreempt, Resources.none())) { return ret; } + // Skip AM Container from preemption for now. + if (c.isAMContainer()) { + skippedAMContainerlist.add(c); + Resources.addTo(skippedAMSize, c.getContainer().getResource()); + continue; + } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Jul 2 01:54:47 2014 @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -832,7 +833,10 @@ public class RMAppAttemptImpl implements // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers() - .get(0)); + .get(0)); + RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler + .getRMContainer(appAttempt.getMasterContainer().getId()); + rmMasterContainer.setAMContainer(true); // The node set in NMTokenSecrentManager is used for marking whether the // NMToken has been issued for this node to the AM. // When AM container was allocated to RM itself, the node which allocates Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Wed Jul 2 01:54:47 2014 @@ -71,5 +71,7 @@ public interface RMContainer extends Eve ContainerState getContainerState(); ContainerReport createContainerReport(); + + boolean isAMContainer(); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Wed Jul 2 01:54:47 2014 @@ -155,6 +155,7 @@ public class RMContainerImpl implements private long creationTime; private long finishTime; private ContainerStatus finishedStatus; + private boolean isAMContainer; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -176,6 +177,7 @@ public class RMContainerImpl implements this.rmContext = rmContext; this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); + this.isAMContainer = false; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -314,6 +316,25 @@ public class RMContainerImpl implements } @Override + public boolean isAMContainer() { + try { + readLock.lock(); + return isAMContainer; + } finally { + readLock.unlock(); + } + } + + public void setAMContainer(boolean isAMContainer) { + try { + writeLock.lock(); + this.isAMContainer = isAMContainer; + } finally { + writeLock.unlock(); + } + } + + @Override public void handle(RMContainerEvent event) { LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType()); try { @@ -490,5 +511,4 @@ public class RMContainerImpl implements } return containerReport; } - } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Wed Jul 2 01:54:47 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; @@ -242,6 +243,20 @@ public abstract class AbstractYarnSchedu // recover scheduler attempt schedulerAttempt.recoverContainer(rmContainer); + + // set master container for the current running AMContainer for this + // attempt. + RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); + if (appAttempt != null) { + Container masterContainer = appAttempt.getMasterContainer(); + + // Mark current running AMContainer's RMContainer based on the master + // container ID stored in AppAttempt. + if (masterContainer != null + && masterContainer.getId().equals(rmContainer.getContainerId())) { + ((RMContainerImpl)rmContainer).setAMContainer(true); + } + } } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Wed Jul 2 01:54:47 2014 @@ -62,6 +62,7 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -564,6 +565,43 @@ public class TestWorkPreservingRMRestart rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); } + + @Test (timeout = 30000) + public void testAMContainerStatusWithRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1_1 = rm1.submitApp(1024); + MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1); + + RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt(); + AbstractYarnScheduler scheduler = + ((AbstractYarnScheduler) rm1.getResourceScheduler()); + + Assert.assertTrue(scheduler.getRMContainer( + attempt0.getMasterContainer().getId()).isAMContainer()); + + // Re-start RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + List am1_1Containers = + createNMContainerStatusForApp(am1_1); + nm1.registerNode(am1_1Containers, null); + + // Wait for RM to settle down on recovering containers; + waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId()); + + scheduler = ((AbstractYarnScheduler) rm2.getResourceScheduler()); + Assert.assertTrue(scheduler.getRMContainer( + attempt0.getMasterContainer().getId()).isAMContainer()); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Wed Jul 2 01:54:47 2014 @@ -80,6 +80,8 @@ public class TestProportionalCapacityPre static final long TS = 3141592653L; int appAlloc = 0; + boolean setAMContainer = false; + float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; Configuration conf = null; @@ -466,7 +468,108 @@ public class TestProportionalCapacityPre fail("Failed to find SchedulingMonitor service, please check what happened"); } + + @Test + public void testSkipAMContainer() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 2 containers from appB + // has to be preempted. + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + setAMContainer = false; + } + + @Test + public void testPreemptSkippedAMContainers() { + int[][] qData = new int[][] { + // / A B + { 100, 10, 90 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 90 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 5, 5 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // All 5 containers of appD will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); + // All 5 containers of appC will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // By skipping AM Container, all other 4 containers of appA will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + } + + @Test + public void testAMResourcePercentForSkippedAMContainers() { + int[][] qData = new int[][] { + // / A B + { 100, 10, 90 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 90 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 5, 5 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setAMResourcePercent = 0.5f; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb. + // Total used AM container size is 20GB, hence 2 AM container has + // to be preempted as Queue Capacity is 10Gb. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // Including AM Container, all other 4 containers of appC will be + // preempted + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // By skipping AM Container, all other 4 containers of appA will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + } + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; @@ -583,6 +686,9 @@ public class TestProportionalCapacityPre } } when(lq.getApplications()).thenReturn(qApps); + if(setAMResourcePercent != 0.0f){ + when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); + } p.getChildQueues().add(lq); return lq; } @@ -607,7 +713,11 @@ public class TestProportionalCapacityPre List cLive = new ArrayList(); for (int i = 0; i < used; i += gran) { - cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + if(setAMContainer && i == 0){ + cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); + }else{ + cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + } ++cAlloc; } when(app.getLiveContainers()).thenReturn(cLive); @@ -623,6 +733,10 @@ public class TestProportionalCapacityPre RMContainer mC = mock(RMContainer.class); when(mC.getContainerId()).thenReturn(cId); when(mC.getContainer()).thenReturn(c); + when(mC.getApplicationAttemptId()).thenReturn(appAttId); + if(0 == priority){ + when(mC.isAMContainer()).thenReturn(true); + } return mC; } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1607227&r1=1607226&r2=1607227&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Wed Jul 2 01:54:47 2014 @@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -600,6 +602,9 @@ public class TestRMAppAttemptTransitions any(List.class), any(List.class))). thenReturn(allocation); + RMContainer rmContainer = mock(RMContainerImpl.class); + when(scheduler.getRMContainer(container.getId())). + thenReturn(rmContainer); applicationAttempt.handle( new RMAppAttemptContainerAllocatedEvent(