Return-Path: X-Original-To: apmail-brooklyn-dev-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B41E11ECC for ; Thu, 26 Jun 2014 11:26:02 +0000 (UTC) Received: (qmail 3564 invoked by uid 500); 26 Jun 2014 11:26:02 -0000 Delivered-To: apmail-brooklyn-dev-archive@brooklyn.apache.org Received: (qmail 3539 invoked by uid 500); 26 Jun 2014 11:26:02 -0000 Mailing-List: contact dev-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list dev@brooklyn.incubator.apache.org Received: (qmail 3525 invoked by uid 99); 26 Jun 2014 11:26:01 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jun 2014 11:26:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 26 Jun 2014 11:26:00 +0000 Received: (qmail 99408 invoked by uid 99); 26 Jun 2014 11:25:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jun 2014 11:25:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C9423834C3D; Thu, 26 Jun 2014 11:25:39 +0000 (UTC) From: aledsage To: dev@brooklyn.incubator.apache.org Reply-To: dev@brooklyn.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-brooklyn pull request: HA uses shared timestamp (last mo... Content-Type: text/plain Message-Id: <20140626112539.C9423834C3D@tyr.zones.apache.org> Date: Thu, 26 Jun 2014 11:25:39 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Github user aledsage commented on a diff in the pull request: https://github.com/apache/incubator-brooklyn/pull/12#discussion_r14235324 --- Diff: core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java --- @@ -0,0 +1,311 @@ +package brooklyn.management.ha; + +import static org.testng.Assert.assertEquals; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; +import brooklyn.entity.rebind.persister.InMemoryObjectStore; +import brooklyn.entity.rebind.persister.ListeningObjectStore; +import brooklyn.entity.rebind.persister.PersistMode; +import brooklyn.entity.rebind.persister.PersistenceObjectStore; +import brooklyn.location.Location; +import brooklyn.management.internal.ManagementContextInternal; +import brooklyn.test.entity.LocalManagementContextForTests; +import brooklyn.test.entity.TestApplication; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; + +@Test +public class HighAvailabilityManagerSplitBrainTest { + + private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerSplitBrainTest.class); + + private List nodes = new MutableList(); + Map sharedBackingStore = MutableMap.of(); + Map sharedBackingStoreDates = MutableMap.of(); + private AtomicLong sharedTime; // used to set the ticker's return value + private ClassLoader classLoader = getClass().getClassLoader(); + + public class HaMgmtNode { + + private ManagementContextInternal mgmt; + private String ownNodeId; + private String nodeName; + private ListeningObjectStore objectStore; + private ManagementPlaneSyncRecordPersister persister; + private HighAvailabilityManagerImpl ha; + private Ticker ticker; + private AtomicLong currentTime; // used to set the ticker's return value + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + if (sharedTime==null) + currentTime = new AtomicLong(System.currentTimeMillis()); + + ticker = new Ticker() { + // strictly not a ticker because returns millis UTC, but it works fine even so + @Override public long read() { + if (sharedTime!=null) return sharedTime.get(); + return currentTime.get(); + } + }; + + nodeName = "node "+nodes.size(); + mgmt = newLocalManagementContext(); + ownNodeId = mgmt.getManagementNodeId(); + objectStore = new ListeningObjectStore(newPersistenceObjectStore()); + objectStore.injectManagementContext(mgmt); + objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED); + persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader); + ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento(); + BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader); + mgmt.getRebindManager().setPersister(persisterObj); + ha = new HighAvailabilityManagerImpl(mgmt) + .setPollPeriod(Duration.PRACTICALLY_FOREVER) + .setHeartbeatTimeout(Duration.THIRTY_SECONDS) + .setLocalTicker(ticker) + .setRemoteTicker(ticker) + .setPersister(persister); + log.info("Created "+nodeName+" "+ownNodeId); + } + + public void tearDown() throws Exception { + if (ha != null) ha.stop(); + if (mgmt != null) Entities.destroyAll(mgmt); + if (objectStore != null) objectStore.deleteCompletely(); + } + + private long tickerCurrentMillis() { + return ticker.read(); + } + + private long tickerAdvance(Duration duration) { + if (sharedTime!=null) + throw new IllegalStateException("Using shared ticker; cannot advance private node clock"); + currentTime.addAndGet(duration.toMilliseconds()); + return tickerCurrentMillis(); + } + + @Override + public String toString() { + return nodeName+" "+ownNodeId; + } + } + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + nodes.clear(); + sharedBackingStore.clear(); + } + + public HaMgmtNode newNode() throws Exception { + HaMgmtNode node = new HaMgmtNode(); + node.setUp(); + nodes.add(node); + return node; + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + for (HaMgmtNode n: nodes) + n.tearDown(); + } + + private void sharedTickerAdvance(Duration duration) { + if (sharedTime==null) { + for (HaMgmtNode n: nodes) + n.tickerAdvance(duration); + } else { + sharedTime.addAndGet(duration.toMilliseconds()); + } + } + + private long sharedTickerCurrentMillis() { + return sharedTime.get(); + } + + protected void useSharedTime() { + if (!nodes.isEmpty()) + throw new IllegalStateException("shared time must be set up before any nodes created"); + sharedTime = new AtomicLong(System.currentTimeMillis()); + } + + protected ManagementContextInternal newLocalManagementContext() { + return new LocalManagementContextForTests(); + } + + protected PersistenceObjectStore newPersistenceObjectStore() { + return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates); + } + + @Test + public void testIfNodeStopsBeingAbleToWrite() throws Exception { + useSharedTime(); + HaMgmtNode n1 = newNode(); + HaMgmtNode n2 = newNode(); + + n1.ha.start(HighAvailabilityMode.AUTO); + ManagementPlaneSyncRecord memento1 = n1.ha.getManagementPlaneSyncState(); + + log.info(n1+" HA: "+memento1); + assertEquals(memento1.getMasterNodeId(), n1.ownNodeId); + Long time0 = sharedTickerCurrentMillis(); + assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); + assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); + + n2.ha.start(HighAvailabilityMode.AUTO); + ManagementPlaneSyncRecord memento2 = n2.ha.getManagementPlaneSyncState(); + + log.info(n2+" HA: "+memento2); + assertEquals(memento2.getMasterNodeId(), n1.ownNodeId); + assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.STANDBY); + assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); + assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time0); + + // and no entities at either + assertEquals(n1.mgmt.getApplications().size(), 0); + assertEquals(n2.mgmt.getApplications().size(), 0); + + // create + TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class), n1.mgmt); + app.start(ImmutableList.of()); + app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello"); + + assertEquals(n1.mgmt.getApplications().size(), 1); + assertEquals(n2.mgmt.getApplications().size(), 0); + n1.mgmt.getRebindManager().forcePersistNow(); --- End diff -- I'm fine with this, but note for other places... The more complicated `waitForWritesComplete` call ensures that tests don't only pass because we're explicitly calling `persistNow`. Instead other tests are more passively observing what the production persistence code is doing. So let's not change the other uses of `waitForWritesComplete`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---