Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D7175200BC5 for ; Tue, 18 Oct 2016 01:14:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D5C1F160AF0; Mon, 17 Oct 2016 23:14:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CC0A3160AEC for ; Tue, 18 Oct 2016 01:14:22 +0200 (CEST) Received: (qmail 2572 invoked by uid 500); 17 Oct 2016 23:14:21 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 2267 invoked by uid 99); 17 Oct 2016 23:14:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 23:14:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0CB46F1592; Mon, 17 Oct 2016 23:14:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: umamahesh@apache.org To: common-commits@hadoop.apache.org Date: Mon, 17 Oct 2016 23:14:30 -0000 Message-Id: In-Reply-To: <91cf63af350e4d6186f9f8359f6fdc63@git.apache.org> References: <91cf63af350e4d6186f9f8359f6fdc63@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/50] hadoop git commit: YARN-5677. RM should transition to standby when connection is lost for an extended period. (Daniel Templeton via kasha) archived-at: Mon, 17 Oct 2016 23:14:24 -0000 YARN-5677. RM should transition to standby when connection is lost for an extended period. (Daniel Templeton via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6476934a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6476934a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6476934a Branch: refs/heads/HDFS-10285 Commit: 6476934ae5de1be7988ab198b673d82fe0f006e3 Parents: 6378845 Author: Karthik Kambatla Authored: Tue Oct 11 22:07:10 2016 -0700 Committer: Karthik Kambatla Committed: Tue Oct 11 22:07:10 2016 -0700 ---------------------------------------------------------------------- .../resourcemanager/EmbeddedElectorService.java | 59 +++++- .../resourcemanager/TestRMEmbeddedElector.java | 191 +++++++++++++++++++ 2 files changed, 244 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6476934a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 72327e8..88d2e10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +40,8 @@ import org.apache.zookeeper.data.ACL; import java.io.IOException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -54,6 +57,10 @@ public class EmbeddedElectorService extends AbstractService private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; + private long zkSessionTimeout; + private Timer zkDisconnectTimer; + @VisibleForTesting + final Object zkDisconnectLock = new Object(); EmbeddedElectorService(RMContext rmContext) { super(EmbeddedElectorService.class.getName()); @@ -80,7 +87,7 @@ public class EmbeddedElectorService extends AbstractService YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; - long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, + zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); List zkAcls = RMZKUtils.getZKAcls(conf); @@ -123,6 +130,8 @@ public class EmbeddedElectorService extends AbstractService @Override public void becomeActive() throws ServiceFailedException { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToActive(req); } catch (Exception e) { @@ -132,6 +141,8 @@ public class EmbeddedElectorService extends AbstractService @Override public void becomeStandby() { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToStandby(req); } catch (Exception e) { @@ -139,13 +150,49 @@ public class EmbeddedElectorService extends AbstractService } } + /** + * Stop the disconnect timer. Any running tasks will be allowed to complete. + */ + private void cancelDisconnectTimer() { + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer != null) { + zkDisconnectTimer.cancel(); + zkDisconnectTimer = null; + } + } + } + + /** + * When the ZK client loses contact with ZK, this method will be called to + * allow the RM to react. Because the loss of connection can be noticed + * before the session timeout happens, it is undesirable to transition + * immediately. Instead the method starts a timer that will wait + * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before + * initiating the transition into standby state. + */ @Override public void enterNeutralMode() { - /** - * Possibly due to transient connection issues. Do nothing. - * TODO: Might want to keep track of how long in this state and transition - * to standby. - */ + LOG.warn("Lost contact with Zookeeper. Transitioning to standby in " + + zkSessionTimeout + " ms if connection is not reestablished."); + + // If we've just become disconnected, start a timer. When the time's up, + // we'll transition to standby. + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer == null) { + zkDisconnectTimer = new Timer("Zookeeper disconnect timer"); + zkDisconnectTimer.schedule(new TimerTask() { + @Override + public void run() { + synchronized (zkDisconnectLock) { + // Only run if the timer hasn't been cancelled + if (zkDisconnectTimer != null) { + becomeStandby(); + } + } + } + }, zkSessionTimeout); + } + } } @SuppressWarnings(value = "unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/6476934a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 20b1c0e..bfd0b4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -28,6 +28,14 @@ import org.junit.Test; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestRMEmbeddedElector extends ClientBaseWithFixes { private static final Log LOG = @@ -41,6 +49,14 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { private Configuration conf; private AtomicBoolean callbackCalled; + private enum SyncTestType { + ACTIVE, + STANDBY, + NEUTRAL, + ACTIVE_TIMING, + STANDBY_TIMING + } + @Before public void setup() throws IOException { conf = new YarnConfiguration(); @@ -79,6 +95,181 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { LOG.info("Stopped RM"); } + /** + * Test that neutral mode plays well with all other transitions. + * + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + @Test + public void testCallbackSynchronization() + throws IOException, InterruptedException { + testCallbackSynchronization(SyncTestType.ACTIVE); + testCallbackSynchronization(SyncTestType.STANDBY); + testCallbackSynchronization(SyncTestType.NEUTRAL); + testCallbackSynchronization(SyncTestType.ACTIVE_TIMING); + testCallbackSynchronization(SyncTestType.STANDBY_TIMING); + } + + /** + * Helper method to test that neutral mode plays well with other transitions. + * + * @param type the type of test to run + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronization(SyncTestType type) + throws IOException, InterruptedException { + AdminService as = mock(AdminService.class); + RMContext rc = mock(RMContext.class); + Configuration myConf = new Configuration(conf); + + myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); + when(rc.getRMAdminService()).thenReturn(as); + + EmbeddedElectorService ees = new EmbeddedElectorService(rc); + ees.init(myConf); + + ees.enterNeutralMode(); + + switch (type) { + case ACTIVE: + testCallbackSynchronizationActive(as, ees); + break; + case STANDBY: + testCallbackSynchronizationStandby(as, ees); + break; + case NEUTRAL: + testCallbackSynchronizationNeutral(as, ees); + break; + case ACTIVE_TIMING: + testCallbackSynchronizationTimingActive(as, ees); + break; + case STANDBY_TIMING: + testCallbackSynchronizationTimingStandby(as, ees); + break; + default: + fail("Unknown test type: " + type); + break; + } + } + + /** + * Helper method to test that neutral mode plays well with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationActive(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.becomeActive(); + + Thread.sleep(100); + + verify(as).transitionToActive(any()); + verify(as, never()).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode plays well with a standby + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationStandby(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.becomeStandby(); + + Thread.sleep(100); + + verify(as, atLeast(1)).transitionToStandby(any()); + verify(as, atMost(1)).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode plays well with itself. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationNeutral(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.enterNeutralMode(); + + Thread.sleep(100); + + verify(as, atLeast(1)).transitionToStandby(any()); + verify(as, atMost(1)).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode does not race with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationTimingActive(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + synchronized (ees.zkDisconnectLock) { + // Sleep while holding the lock so that the timer thread can't do + // anything when it runs. Sleep until we're pretty sure the timer thread + // has tried to run. + Thread.sleep(100); + // While still holding the lock cancel the timer by transitioning. This + // simulates a race where the callback goes to cancel the timer while the + // timer is trying to run. + ees.becomeActive(); + } + + // Sleep just a little more so that the timer thread can do whatever it's + // going to do, hopefully nothing. + Thread.sleep(50); + + verify(as).transitionToActive(any()); + verify(as, never()).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode does not race with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationTimingStandby(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + synchronized (ees.zkDisconnectLock) { + // Sleep while holding the lock so that the timer thread can't do + // anything when it runs. Sleep until we're pretty sure the timer thread + // has tried to run. + Thread.sleep(100); + // While still holding the lock cancel the timer by transitioning. This + // simulates a race where the callback goes to cancel the timer while the + // timer is trying to run. + ees.becomeStandby(); + } + + // Sleep just a little more so that the timer thread can do whatever it's + // going to do, hopefully nothing. + Thread.sleep(50); + + verify(as, atLeast(1)).transitionToStandby(any()); + verify(as, atMost(1)).transitionToStandby(any()); + } + private class MockRMWithElector extends MockRM { private long delayMs = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org