Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AA17F18E07 for ; Fri, 18 Mar 2016 17:01:36 +0000 (UTC) Received: (qmail 44094 invoked by uid 500); 18 Mar 2016 17:01:36 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 44036 invoked by uid 500); 18 Mar 2016 17:01:36 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 44027 invoked by uid 99); 18 Mar 2016 17:01:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 17:01:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 44669DFAFF; Fri, 18 Mar 2016 17:01:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: epayne@apache.org To: common-commits@hadoop.apache.org Message-Id: <9c475996212849828a494eeae20b25b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger. Date: Fri, 18 Mar 2016 17:01:36 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 9b623fbaf -> 92b7e0d41 YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92b7e0d4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92b7e0d4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92b7e0d4 Branch: refs/heads/trunk Commit: 92b7e0d41302b6b110927f99de5c2b4a4a93c5fd Parents: 9b623fb Author: Eric Payne Authored: Fri Mar 18 16:11:06 2016 +0000 Committer: Eric Payne Committed: Fri Mar 18 16:12:47 2016 +0000 ---------------------------------------------------------------------- .../hadoop/yarn/client/ProtocolHATestBase.java | 3 +- .../hadoop/yarn/client/TestRMFailover.java | 2 - .../yarn/client/api/impl/TestYarnClient.java | 18 +++++ .../nodemanager/NodeStatusUpdaterImpl.java | 50 +++++++------ .../hadoop/yarn/server/MiniYARNCluster.java | 75 +++++++------------- .../yarn/server/TestMiniYARNClusterForHA.java | 4 -- 6 files changed, 75 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index cf7fcc5..f336b0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -217,7 +217,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { protected void verifyConnections() throws InterruptedException, YarnException { assertTrue("NMs failed to connect to the RM", - cluster.waitForNodeManagersToConnect(20000)); + cluster.waitForNodeManagersToConnect(5000)); verifyClientConnection(); } @@ -279,7 +279,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { cluster.resetStartFailoverFlag(false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index cbc220a..f323351 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -142,7 +142,6 @@ public class TestRMFailover extends ClientBaseWithFixes { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); @@ -231,7 +230,6 @@ public class TestRMFailover extends ClientBaseWithFixes { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 2c34b99..2d11d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -1195,6 +1195,24 @@ public class TestYarnClient { client.init(yarnConf); client.start(); + int attempts; + for(attempts = 10; attempts > 0; attempts--) { + if (cluster.getResourceManager().getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity() + .getMemory() > 0) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (attempts <= 0) { + Assert.fail("Exhausted attempts in checking if node capacity was " + + "added to the plan"); + } + // create a reservation Clock clock = new UTCClock(); long arrival = clock.getTime(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5806731..ad983fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -96,6 +96,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); + private final Object shutdownMonitor = new Object(); private final Context context; private final Dispatcher dispatcher; @@ -240,15 +241,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override protected void serviceStop() throws Exception { // the isStopped check is for avoiding multiple unregistrations. - if (this.registeredWithRM && !this.isStopped - && !isNMUnderSupervisionWithRecoveryEnabled() - && !context.getDecommissioned() && !failedToConnect) { - unRegisterNM(); + synchronized(shutdownMonitor) { + if (this.registeredWithRM && !this.isStopped + && !isNMUnderSupervisionWithRecoveryEnabled() + && !context.getDecommissioned() && !failedToConnect) { + unRegisterNM(); + } + // Interrupt the updater. + this.isStopped = true; + stopRMProxy(); + super.serviceStop(); } - // Interrupt the updater. - this.isStopped = true; - stopRMProxy(); - super.serviceStop(); } private boolean isNMUnderSupervisionWithRecoveryEnabled() { @@ -275,19 +278,24 @@ public class NodeStatusUpdaterImpl extends AbstractService implements protected void rebootNodeStatusUpdaterAndRegisterWithRM() { // Interrupt the updater. - this.isStopped = true; - - try { - statusUpdater.join(); - registerWithRM(); - statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); - this.isStopped = false; - statusUpdater.start(); - LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); - } catch (Exception e) { - String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; - LOG.error(errorMessage, e); - throw new YarnRuntimeException(e); + synchronized(shutdownMonitor) { + if(this.isStopped) { + LOG.info("Currently being shutdown. Aborting reboot"); + return; + } + this.isStopped = true; + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); + this.isStopped = false; + LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); + } catch (Exception e) { + String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; + LOG.error(errorMessage, e); + throw new YarnRuntimeException(e); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 024adc6..74b7732 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -275,6 +278,12 @@ public class MiniYARNCluster extends CompositeService { conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } + @Override + protected synchronized void serviceStart() throws Exception { + super.serviceStart(); + this.waitForNodeManagersToConnect(5000); + } + private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) { String hostname = MiniYARNCluster.getHostname(); conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); @@ -314,19 +323,7 @@ public class MiniYARNCluster extends CompositeService { private synchronized void startResourceManager(final int index) { try { - Thread rmThread = new Thread() { - public void run() { - resourceManagers[index].start(); - } - }; - rmThread.setName("RM-" + index); - rmThread.start(); - int waitCount = 0; - while (resourceManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for RM to start..."); - Thread.sleep(1500); - } + resourceManagers[index].start(); if (resourceManagers[index].getServiceState() != STATE.STARTED) { // RM could have failed. throw new IOException( @@ -456,6 +453,11 @@ public class MiniYARNCluster extends CompositeService { @Override protected synchronized void serviceStart() throws Exception { startResourceManager(index); + if(index == 0) { + resourceManagers[index].getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED)); + } Configuration conf = resourceManagers[index].getConfig(); LOG.info("Starting resourcemanager " + index); LOG.info("MiniYARN ResourceManager address: " + @@ -565,26 +567,12 @@ public class MiniYARNCluster extends CompositeService { } protected synchronized void serviceStart() throws Exception { - try { - new Thread() { - public void run() { - nodeManagers[index].start(); - } - }.start(); - int waitCount = 0; - while (nodeManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for NM " + index + " to start..."); - Thread.sleep(1000); - } - if (nodeManagers[index].getServiceState() != STATE.STARTED) { - // RM could have failed. - throw new IOException("NodeManager " + index + " failed to start"); - } - super.serviceStart(); - } catch (Throwable t) { - throw new YarnRuntimeException(t); + nodeManagers[index].start(); + if (nodeManagers[index].getServiceState() != STATE.STARTED) { + // NM could have failed. + throw new IOException("NodeManager " + index + " failed to start"); } + super.serviceStart(); } @Override @@ -715,7 +703,7 @@ public class MiniYARNCluster extends CompositeService { /** * Wait for all the NodeManagers to connect to the ResourceManager. * - * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds. + * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds. * @return true if all NodeManagers connect to the (Active) * ResourceManager, false otherwise. * @throws YarnException @@ -724,17 +712,19 @@ public class MiniYARNCluster extends CompositeService { public boolean waitForNodeManagersToConnect(long timeout) throws YarnException, InterruptedException { GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); - for (int i = 0; i < timeout / 100; i++) { + for (int i = 0; i < timeout / 10; i++) { ResourceManager rm = getResourceManager(); if (rm == null) { throw new YarnException("Can not find the active RM."); } else if (nodeManagers.length == rm.getClientRMService() - .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + LOG.info("All Node Managers connected in MiniYARNCluster"); return true; } - Thread.sleep(100); + Thread.sleep(10); } + LOG.info("Node Managers did not connect within 5000ms"); return false; } @@ -769,18 +759,7 @@ public class MiniYARNCluster extends CompositeService { @Override protected synchronized void serviceStart() throws Exception { - - new Thread() { - public void run() { - appHistoryServer.start(); - }; - }.start(); - int waitCount = 0; - while (appHistoryServer.getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for Timeline Server to start..."); - Thread.sleep(1500); - } + appHistoryServer.start(); if (appHistoryServer.getServiceState() != STATE.STARTED) { // AHS could have failed. IOException ioe = new IOException( http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java index e84d62e..384d1cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java @@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA { cluster.init(conf); cluster.start(); - cluster.getResourceManager(0).getRMContext().getRMAdminService() - .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( - HAServiceProtocol.RequestSource.REQUEST_BY_USER)); - assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); }