Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DAF9A1145E for ; Tue, 19 Aug 2014 20:34:20 +0000 (UTC) Received: (qmail 14026 invoked by uid 500); 19 Aug 2014 20:34:20 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 13994 invoked by uid 500); 19 Aug 2014 20:34:20 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 13981 invoked by uid 99); 19 Aug 2014 20:34:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 20:34:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 20:33:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CC3AA2388A38; Tue, 19 Aug 2014 20:33:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1618972 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-s... Date: Tue, 19 Aug 2014 20:33:50 -0000 To: yarn-commits@hadoop.apache.org From: zjshen@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140819203350.CC3AA2388A38@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: zjshen Date: Tue Aug 19 20:33:49 2014 New Revision: 1618972 URL: http://svn.apache.org/r1618972 Log: YARN-2249. Avoided AM release requests being lost on work preserving RM restart. Contributed by Jian He. Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Aug 19 20:33:49 2014 @@ -214,6 +214,9 @@ Release 2.6.0 - UNRELEASED YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher. (Rohith via jianhe) + YARN-2249. Avoided AM release requests being lost on work preserving RM + restart. (Jian He via zjshen) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Tue Aug 19 20:33:49 2014 @@ -23,10 +23,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -34,18 +38,25 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -54,6 +65,7 @@ import org.apache.hadoop.yarn.util.resou import com.google.common.util.concurrent.SettableFuture; + @SuppressWarnings("unchecked") public abstract class AbstractYarnScheduler @@ -72,6 +84,7 @@ public abstract class AbstractYarnSchedu protected RMContext rmContext; protected Map> applications; + protected int nmExpireInterval; protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -87,6 +100,15 @@ public abstract class AbstractYarnSchedu super(name); } + @Override + public void serviceInit(Configuration conf) throws Exception { + nmExpireInterval = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + createReleaseCache(); + super.serviceInit(conf); + } + public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); @@ -281,6 +303,19 @@ public abstract class AbstractYarnSchedu ((RMContainerImpl)rmContainer).setAMContainer(true); } } + + synchronized (schedulerAttempt) { + Set releases = schedulerAttempt.getPendingRelease(); + if (releases.contains(container.getContainerId())) { + // release the container + rmContainer.handle(new RMContainerFinishedEvent(container + .getContainerId(), SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED)); + releases.remove(container.getContainerId()); + LOG.info(container.getContainerId() + " is released by application."); + } + } } } @@ -320,6 +355,62 @@ public abstract class AbstractYarnSchedu } } + protected void createReleaseCache() { + // Cleanup the cache after nm expire interval. + new Timer().schedule(new TimerTask() { + @Override + public void run() { + for (SchedulerApplication app : applications.values()) { + + T attempt = app.getCurrentAppAttempt(); + synchronized (attempt) { + for (ContainerId containerId : attempt.getPendingRelease()) { + RMAuditLogger.logFailure( + app.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", + "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + attempt.getPendingRelease().clear(); + } + } + LOG.info("Release request cache is cleaned up"); + } + }, nmExpireInterval); + } + + // clean up a completed container + protected abstract void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event); + + protected void releaseContainers(List containers, + SchedulerApplicationAttempt attempt) { + for (ContainerId containerId : containers) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + < nmExpireInterval) { + LOG.info(containerId + " doesn't exist. Add the container" + + " to the release request cache as it maybe on recovery."); + synchronized (attempt) { + attempt.getPendingRelease().add(containerId); + } + } else { + RMAuditLogger.logFailure(attempt.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + } + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); + } + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Aug 19 20:33:49 2014 @@ -17,13 +17,14 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt protected List newlyAllocatedContainers = new ArrayList(); + // This pendingRelease is used in work-preserving recovery scenario to keep + // track of the AM's outstanding release requests. RM on recovery could + // receive the release request form AM before it receives the container status + // from NM for recovery. In this case, the to-be-recovered containers reported + // by NM should not be recovered. + private Set pendingRelease = null; + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt new AppSchedulingInfo(applicationAttemptId, user, queue, activeUsersManager, rmContext.getEpoch()); this.queue = queue; - + this.pendingRelease = new HashSet(); if (rmContext.getRMApps() != null && rmContext.getRMApps() .containsKey(applicationAttemptId.getApplicationId())) { @@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt return appSchedulingInfo.getResourceRequests(priority); } + public Set getPendingRelease() { + return this.pendingRelease; + } + public int getNewContainerId() { return appSchedulingInfo.getNewContainerId(); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 19 20:33:49 2014 @@ -54,8 +54,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; @@ -199,7 +197,7 @@ public class CapacityScheduler extends private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; private boolean overrideWithQueueMappings = false; - private List mappings = new ArrayList(); + private List mappings = null; private Groups groups; @VisibleForTesting @@ -789,21 +787,7 @@ public class CapacityScheduler extends getMinimumResourceCapability(), maximumAllocation); // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "CapacityScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -1098,7 +1082,8 @@ public class CapacityScheduler extends } @Lock(CapacityScheduler.class) - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Aug 19 20:33:49 2014 @@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -810,7 +808,8 @@ public class FairScheduler extends /** * Clean up a completed container. */ - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -913,21 +912,7 @@ public class FairScheduler extends } // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { if (!ask.isEmpty()) { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Aug 19 20:33:49 2014 @@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; - import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -295,21 +292,7 @@ public class FifoScheduler extends clusterResource, minimumAllocation, maximumAllocation); // Release containers - for (ContainerId releasedContainer : release) { - RMContainer rmContainer = getRMContainer(releasedContainer); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FifoScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainer); - } - containerCompleted(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainer, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -443,7 +426,7 @@ public class FifoScheduler extends LOG.info("Skip killing " + container.getContainerId()); continue; } - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -717,7 +700,7 @@ public class FifoScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - containerCompleted(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -818,7 +801,7 @@ public class FifoScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - containerCompleted(getRMContainer(containerid), + completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), @@ -831,7 +814,8 @@ public class FifoScheduler extends } @Lock(FifoScheduler.class) - private synchronized void containerCompleted(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -881,7 +865,7 @@ public class FifoScheduler extends } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Tue Aug 19 20:33:49 2014 @@ -49,7 +49,7 @@ public class MockAM { private volatile int responseId = 0; private final ApplicationAttemptId attemptId; - private final RMContext context; + private RMContext context; private ApplicationMasterProtocol amRMProtocol; private final List requests = new ArrayList(); @@ -61,8 +61,10 @@ public class MockAM { this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } - - void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) { + + public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol, + RMContext context) { + this.context = context; this.amRMProtocol = amRMProtocol; } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Tue Aug 19 20:33:49 2014 @@ -171,7 +171,6 @@ public class TestApplicationMasterServic RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.setAMRMProtocol(rm.getApplicationMasterService()); AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); List release = new ArrayList(); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Aug 19 20:33:49 2014 @@ -289,7 +289,7 @@ public class TestRMRestart { // verify old AM is not accepted // change running AM to talk to new RM - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); @@ -1663,7 +1663,7 @@ public class TestRMRestart { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.allocate(new ArrayList(), new ArrayList()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1618972&r1=1618971&r2=1618972&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Tue Aug 19 20:33:49 2014 @@ -33,10 +33,13 @@ import java.util.Set; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -72,6 +75,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import com.google.common.base.Supplier; + + @SuppressWarnings({"rawtypes", "unchecked"}) @RunWith(value = Parameterized.class) public class TestWorkPreservingRMRestart { @@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); - am0.setAMRMProtocol(rm2.getApplicationMasterService()); - am0.registerAppAttempt(false); + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.registerAppAttempt(true); rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); @@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); } + // Test if RM on recovery receives the container release request from AM + // before it receives the container status reported by NM for recovery. this + // container should not be recovered. + @Test (timeout = 30000) + public void testReleasedContainerNotRecovered() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + rm1.start(); + + RMApp app1 = rm1.submitApp(1024); + final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Re-start RM + conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000); + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(true); + + // try to release a container before the container is actually recovered. + final ContainerId runningContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + am1.allocate(null, Arrays.asList(runningContainer)); + + // send container statuses to recover the containers + List containerStatuses = + createNMContainerStatusForApp(am1); + nm1.registerNode(containerStatuses, null); + + // only the am container should be recovered. + waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId()); + + final AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // cached release request is cleaned. + // assertFalse(scheduler.getPendingRelease().contains(runningContainer)); + + AllocateResponse response = am1.allocate(null, null); + // AM gets notified of the completed container. + boolean receivedCompletedContainer = false; + for (ContainerStatus status : response.getCompletedContainersStatuses()) { + if (status.getContainerId().equals(runningContainer)) { + receivedCompletedContainer = true; + } + } + assertTrue(receivedCompletedContainer); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + // release cache is cleaned up and previous running container is not + // recovered + return scheduler.getApplicationAttempt(am1.getApplicationAttemptId()) + .getPendingRelease().isEmpty() + && scheduler.getRMContainer(runningContainer) == null; + } + }, 1000, 20000); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, @@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); } - private void waitForNumContainersToRecover(int num, MockRM rm, + public static void waitForNumContainersToRecover(int num, MockRM rm, ApplicationAttemptId attemptId) throws Exception { AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler(); @@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart attempt = scheduler.getApplicationAttempt(attemptId); } while (attempt.getLiveContainers().size() < num) { - System.out.println("Wait for " + num + " containers to recover."); + System.out.println("Wait for " + num + + " containers to recover. currently: " + + attempt.getLiveContainers().size()); Thread.sleep(200); } }