Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00C86200C48 for ; Thu, 6 Apr 2017 10:54:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F3AD1160B83; Thu, 6 Apr 2017 08:54:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C6523160B84 for ; Thu, 6 Apr 2017 10:54:31 +0200 (CEST) Received: (qmail 5291 invoked by uid 500); 6 Apr 2017 08:54:31 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 5166 invoked by uid 99); 6 Apr 2017 08:54:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 08:54:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CD935DFC31; Thu, 6 Apr 2017 08:54:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zmanji@apache.org To: commits@aurora.apache.org Message-Id: <0504ef884f864d6c87b6f2bde3448adf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Reliably subscribe to Mesos in the HTTP Driver. Date: Thu, 6 Apr 2017 08:54:29 +0000 (UTC) archived-at: Thu, 06 Apr 2017 08:54:33 -0000 Repository: aurora Updated Branches: refs/heads/master 7678d194f -> 656cf9ac5 Reliably subscribe to Mesos in the HTTP Driver. As noted in AURORA-1911 the `V1Mesos` driver doesn't re try `SUBSCRIBE` calls if they fail. This means that after a leader subscribes and disconnects, it is possible for it to never re subscribe again if the Mesos Master is unhealthy. To fix this, I have moved the subscription into the dedicated `SchedulerExecutor` and it coninutes to attempt to subscribe using truncated binary backoff. It only stops if we are disconnected or if we sucessfully connect. Bugs closed: AURORA-1911 Reviewed at https://reviews.apache.org/r/58053/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/656cf9ac Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/656cf9ac Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/656cf9ac Branch: refs/heads/master Commit: 656cf9ac59754cb25084821b74ca22ef0d0ff2d5 Parents: 7678d19 Author: Zameer Manji Authored: Thu Apr 6 10:40:41 2017 +0200 Committer: Zameer Manji Committed: Thu Apr 6 10:40:41 2017 +0200 ---------------------------------------------------------------------- .../aurora/benchmark/StatusUpdateBenchmark.java | 3 +- .../scheduler/mesos/MesosCallbackHandler.java | 17 +-- .../scheduler/mesos/SchedulerDriverModule.java | 19 ++- .../mesos/VersionedMesosSchedulerImpl.java | 57 +++++-- .../mesos/VersionedMesosSchedulerImplTest.java | 148 ++++++++++++++++++- 5 files changed, 211 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java index 206b114..c81387f 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java +++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java @@ -66,6 +66,7 @@ import org.apache.aurora.scheduler.mesos.MesosCallbackHandler; import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl; import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl; import org.apache.aurora.scheduler.mesos.ProtosConversion; +import org.apache.aurora.scheduler.mesos.SchedulerDriverModule; import org.apache.aurora.scheduler.mesos.TestExecutorSettings; import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; @@ -189,7 +190,7 @@ public class StatusUpdateBenchmark { bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class); bind(MesosCallbackHandlerImpl.class).in(Singleton.class); bind(Executor.class) - .annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class) + .annotatedWith(SchedulerDriverModule.SchedulerExecutor.class) .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor( "SchedulerImpl-%d", LoggerFactory.getLogger(StatusUpdateBenchmark.class))); http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java index 5bf1e4e..5a5281a 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.mesos; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; @@ -22,7 +20,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; -import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -57,10 +54,6 @@ import org.apache.mesos.v1.Protos.TaskStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; import static org.apache.mesos.v1.Protos.TaskStatus.Reason.REASON_RECONCILIATION; @@ -106,14 +99,6 @@ public interface MesosCallbackHandler { private final AtomicBoolean frameworkRegistered; /** - * Binding annotation for the executor the incoming Mesos message handler uses. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface SchedulerExecutor { } - - /** * Creates a new handler for callbacks. * * @param storage Store to save host attributes into. @@ -130,7 +115,7 @@ public interface MesosCallbackHandler { TaskStatusHandler taskStatusHandler, OfferManager offerManager, EventSink eventSink, - @SchedulerExecutor Executor executor, + @SchedulerDriverModule.SchedulerExecutor Executor executor, StatsProvider statsProvider, Driver driver, Clock clock, http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java index 10d4f1b..18dc3e0 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java @@ -13,10 +13,14 @@ */ package org.apache.aurora.scheduler.mesos; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; import java.util.concurrent.Executor; +import javax.inject.Qualifier; import javax.inject.Singleton; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.AbstractModule; import org.apache.aurora.scheduler.app.SchedulerMain; @@ -27,6 +31,11 @@ import org.apache.mesos.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + import static com.google.common.base.Preconditions.checkState; /** @@ -40,6 +49,14 @@ public class SchedulerDriverModule extends AbstractModule { this.kind = kind; } + /** + * Binding annotation for the executor the incoming Mesos message handler uses. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface SchedulerExecutor { } + @Override protected void configure() { bind(Scheduler.class).to(MesosSchedulerImpl.class); @@ -48,7 +65,7 @@ public class SchedulerDriverModule extends AbstractModule { bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class); bind(MesosCallbackHandlerImpl.class).in(Singleton.class); // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant. - bind(Executor.class).annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class) + bind(Executor.class).annotatedWith(SchedulerExecutor.class) .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG)); switch (kind) { http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java index 67d356a..5329de5 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java @@ -13,6 +13,9 @@ */ package org.apache.aurora.scheduler.mesos; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; import com.google.common.base.Optional; @@ -21,6 +24,8 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import org.apache.aurora.common.inject.TimedInterceptor; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.util.BackoffHelper; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.Storage; import org.apache.mesos.v1.Protos; @@ -45,8 +50,13 @@ public class VersionedMesosSchedulerImpl implements Scheduler { private final MesosCallbackHandler handler; private final Storage storage; private final FrameworkInfoFactory infoFactory; + private final Executor executor; + private final BackoffHelper backoffHelper; - private volatile boolean isRegistered = false; + private final AtomicBoolean isSubscribed = new AtomicBoolean(false); + private final AtomicBoolean isConnected = new AtomicBoolean(false); + private final AtomicBoolean isRegistered = new AtomicBoolean(false); + private final AtomicLong subcriptionCalls; private static final String EVENT_COUNTER_STAT_PREFIX = "mesos_scheduler_event_"; // A cache to hold the metric names to prevent us from creating strings for every event @@ -64,18 +74,26 @@ public class VersionedMesosSchedulerImpl implements Scheduler { VersionedMesosSchedulerImpl( MesosCallbackHandler handler, CachedCounters counters, + StatsProvider statsProvider, Storage storage, + @SchedulerDriverModule.SchedulerExecutor Executor executor, + BackoffHelper backoffHelper, FrameworkInfoFactory factory) { this.handler = requireNonNull(handler); this.counters = requireNonNull(counters); this.storage = requireNonNull(storage); this.infoFactory = requireNonNull(factory); + this.executor = requireNonNull(executor); + this.backoffHelper = requireNonNull(backoffHelper); initializeEventMetrics(); + + this.subcriptionCalls = statsProvider.makeCounter("mesos_scheduler_subscription_attempts"); } @Override public void connected(Mesos mesos) { LOG.info("Connected to Mesos master."); + isConnected.set(true); Optional frameworkId = storage.read( storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId()); @@ -95,15 +113,35 @@ public class VersionedMesosSchedulerImpl implements Scheduler { LOG.warn("Did not find a persisted framework ID, connecting as a new framework."); } - LOG.info("Sending subscribe call"); - mesos.send(call.setSubscribe(Call.Subscribe.newBuilder() - .setFrameworkInfo(frameworkBuilder.build()) - .build()) - .build()); + call.setSubscribe(Call.Subscribe.newBuilder().setFrameworkInfo(frameworkBuilder)); + + executor.execute(() -> { + LOG.info("Starting to subscribe to Mesos with backoff."); + try { + backoffHelper.doUntilSuccess(() -> { + if (!isConnected.get()) { + LOG.info("Disconnected while attempting to subscribe. Stopping attempt."); + return true; + } + if (!isSubscribed.get()) { + LOG.info("Sending subscribe call."); + mesos.send(call.build()); + subcriptionCalls.incrementAndGet(); + return false; + } + LOG.info("Subscribed to Mesos"); + return true; + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); } @Override public void disconnected(Mesos mesos) { + isSubscribed.set(false); + isConnected.set(false); handler.handleDisconnection(); } @@ -126,16 +164,17 @@ public class VersionedMesosSchedulerImpl implements Scheduler { switch(event.getType()) { case SUBSCRIBED: Event.Subscribed subscribed = event.getSubscribed(); - if (isRegistered) { + if (isRegistered.get()) { handler.handleReregistration(subscribed.getMasterInfo()); } else { + isRegistered.set(true); handler.handleRegistration(subscribed.getFrameworkId(), subscribed.getMasterInfo()); - isRegistered = true; } + isSubscribed.set(true); break; case OFFERS: - checkState(isRegistered, "Must be registered before receiving offers."); + checkState(isSubscribed.get(), "Must be registered before receiving offers."); handler.handleOffers(event.getOffers().getOffersList()); break; http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java index 756d0d9..8c259bf 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java @@ -13,12 +13,16 @@ */ package org.apache.aurora.scheduler.mesos; +import java.util.concurrent.Executors; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.net.InetAddresses; import com.google.protobuf.ByteString; +import org.apache.aurora.common.base.ExceptionalSupplier; import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.BackoffHelper; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; @@ -40,10 +44,12 @@ import org.easymock.Capture; import org.junit.Before; import org.junit.Test; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class VersionedMesosSchedulerImplTest extends EasyMockTest { @@ -53,6 +59,7 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest { private Mesos driver; private FakeStatsProvider statsProvider; private FrameworkInfoFactory infoFactory; + private BackoffHelper backoffHelper; private VersionedMesosSchedulerImpl scheduler; @@ -145,31 +152,44 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest { driver = createMock(Mesos.class); statsProvider = new FakeStatsProvider(); infoFactory = createMock(FrameworkInfoFactory.class); + backoffHelper = createMock(BackoffHelper.class); scheduler = new VersionedMesosSchedulerImpl( handler, new CachedCounters(statsProvider), + statsProvider, storageUtil.storage, + Executors.newSingleThreadExecutor(), + backoffHelper, infoFactory); } - @Test - public void testConnected() { + @Test(timeout = 300000) + public void testConnected() throws Exception { // Once the V1 driver has connected, we need to establish a subscription to get events - - storageUtil.expectOperations(); - expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); - expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO); + expectFrameworkInfoRead(); Capture subscribeCapture = createCapture(); driver.send(capture(subscribeCapture)); expectLastCall().once(); + Capture> supplierCapture = createCapture(); + backoffHelper.doUntilSuccess(capture(supplierCapture)); + expectLastCall().once(); + control.replay(); scheduler.connected(driver); + waitUntilCaptured(supplierCapture); + + assertTrue(supplierCapture.hasCaptured()); + ExceptionalSupplier supplier = supplierCapture.getValue(); + + // Make one connection attempt + supplier.get(); + assertTrue(subscribeCapture.hasCaptured()); Call subscribe = subscribeCapture.getValue(); @@ -181,6 +201,78 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest { FRAMEWORK_INFO.toBuilder().setId(FRAMEWORK).build()); } + @Test(timeout = 300000) + public void testAttemptSubscriptionSuccessful() throws Exception { + expectFrameworkInfoRead(); + + // Other tests already check if what we send is correct. + driver.send(anyObject()); + expectLastCall().once(); + driver.send(anyObject()); + expectLastCall().once(); + + Capture> supplierCapture = createCapture(); + backoffHelper.doUntilSuccess(capture(supplierCapture)); + expectLastCall().once(); + + handler.handleRegistration(FRAMEWORK, MASTER); + + control.replay(); + + scheduler.connected(driver); + + waitUntilCaptured(supplierCapture); + assertTrue(supplierCapture.hasCaptured()); + ExceptionalSupplier supplier = supplierCapture.getValue(); + + // Each attempt should return false. + assertFalse(supplier.get()); + assertFalse(supplier.get()); + + // After the callback we should return true because it was successful. + scheduler.received(driver, SUBSCRIBED_EVENT); + + assertTrue(supplier.get()); + } + + @Test(timeout = 300000) + public void testAttemptSubscriptionHaltsAfterDisconnection() throws Exception { + storageUtil.expectOperations(); + expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); + expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO); + + // Other tests already check if what we send is correct. + driver.send(anyObject()); + expectLastCall().once(); + + Capture> supplierCapture = createCapture(); + backoffHelper.doUntilSuccess(capture(supplierCapture)); + expectLastCall().once(); + + handler.handleDisconnection(); + + control.replay(); + + scheduler.connected(driver); + + waitUntilCaptured(supplierCapture); + assertTrue(supplierCapture.hasCaptured()); + ExceptionalSupplier supplier = supplierCapture.getValue(); + + assertFalse(supplier.get()); + + // After disconnection we should stop. + scheduler.disconnected(driver); + + assertTrue(supplier.get()); + } + + private static void waitUntilCaptured(Capture capture) throws Exception { + while (!capture.hasCaptured()) { + Thread.sleep(1000); + } + } + @Test public void testDisconnected() { handler.handleDisconnection(); @@ -272,4 +364,48 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest { scheduler.received(driver, FAILED_AGENT_EVENT); assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_FAILURE")); } + + @Test(timeout = 300000) + public void testSubscribeDisconnectSubscribeCycle() throws Exception { + expectFrameworkInfoRead(); + + Capture> firstSubscribeAttempt = createCapture(); + backoffHelper.doUntilSuccess(capture(firstSubscribeAttempt)); + expectLastCall().once(); + + handler.handleRegistration(FRAMEWORK, MASTER); + handler.handleDisconnection(); + + Capture> secondSubscribeAttempt = + createCapture(); + backoffHelper.doUntilSuccess(capture(secondSubscribeAttempt)); + expectLastCall().once(); + + // Second subscribe should call the reregistration handler. + handler.handleReregistration(MASTER); + + control.replay(); + + scheduler.connected(driver); + + waitUntilCaptured(firstSubscribeAttempt); + assertTrue(firstSubscribeAttempt.hasCaptured()); + + scheduler.received(driver, SUBSCRIBED_EVENT); + scheduler.disconnected(driver); + scheduler.connected(driver); + + waitUntilCaptured(secondSubscribeAttempt); + assertTrue(secondSubscribeAttempt.hasCaptured()); + + scheduler.received(driver, SUBSCRIBED_EVENT); + } + + private void expectFrameworkInfoRead() { + storageUtil.expectOperations(); + expect(storageUtil.schedulerStore.fetchFrameworkId()) + .andReturn(Optional.of(FRAMEWORK_ID)) + .anyTimes(); + expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO).anyTimes(); + } }