brooklyn-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aledsage <...@git.apache.org>
Subject [GitHub] incubator-brooklyn pull request: HA uses shared timestamp (last mo...
Date Thu, 26 Jun 2014 11:25:39 GMT
Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/12#discussion_r14235324
  
    --- Diff: core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
---
    @@ -0,0 +1,311 @@
    +package brooklyn.management.ha;
    +
    +import static org.testng.Assert.assertEquals;
    +
    +import java.util.Date;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.testng.Assert;
    +import org.testng.annotations.AfterMethod;
    +import org.testng.annotations.BeforeMethod;
    +import org.testng.annotations.Test;
    +
    +import brooklyn.entity.basic.ApplicationBuilder;
    +import brooklyn.entity.basic.Entities;
    +import brooklyn.entity.proxying.EntitySpec;
    +import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
    +import brooklyn.entity.rebind.persister.InMemoryObjectStore;
    +import brooklyn.entity.rebind.persister.ListeningObjectStore;
    +import brooklyn.entity.rebind.persister.PersistMode;
    +import brooklyn.entity.rebind.persister.PersistenceObjectStore;
    +import brooklyn.location.Location;
    +import brooklyn.management.internal.ManagementContextInternal;
    +import brooklyn.test.entity.LocalManagementContextForTests;
    +import brooklyn.test.entity.TestApplication;
    +import brooklyn.util.collections.MutableList;
    +import brooklyn.util.collections.MutableMap;
    +import brooklyn.util.repeat.Repeater;
    +import brooklyn.util.time.Duration;
    +import brooklyn.util.time.Time;
    +
    +import com.google.common.base.Ticker;
    +import com.google.common.collect.ImmutableList;
    +
    +@Test
    +public class HighAvailabilityManagerSplitBrainTest {
    +
    +    private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerSplitBrainTest.class);
    +    
    +    private List<HaMgmtNode> nodes = new MutableList<HighAvailabilityManagerSplitBrainTest.HaMgmtNode>();
    +    Map<String,String> sharedBackingStore = MutableMap.of();
    +    Map<String,Date> sharedBackingStoreDates = MutableMap.of();
    +    private AtomicLong sharedTime; // used to set the ticker's return value
    +    private ClassLoader classLoader = getClass().getClassLoader();
    +    
    +    public class HaMgmtNode {
    +        
    +        private ManagementContextInternal mgmt;
    +        private String ownNodeId;
    +        private String nodeName;
    +        private ListeningObjectStore objectStore;
    +        private ManagementPlaneSyncRecordPersister persister;
    +        private HighAvailabilityManagerImpl ha;
    +        private Ticker ticker;
    +        private AtomicLong currentTime; // used to set the ticker's return value
    +
    +        @BeforeMethod(alwaysRun=true)
    +        public void setUp() throws Exception {
    +            if (sharedTime==null)
    +                currentTime = new AtomicLong(System.currentTimeMillis());
    +            
    +            ticker = new Ticker() {
    +                // strictly not a ticker because returns millis UTC, but it works fine
even so
    +                @Override public long read() {
    +                    if (sharedTime!=null) return sharedTime.get();
    +                    return currentTime.get();
    +                }
    +            };
    +            
    +            nodeName = "node "+nodes.size();
    +            mgmt = newLocalManagementContext();
    +            ownNodeId = mgmt.getManagementNodeId();
    +            objectStore = new ListeningObjectStore(newPersistenceObjectStore());
    +            objectStore.injectManagementContext(mgmt);
    +            objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
    +            persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore,
classLoader);
    +            ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
    +            BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore,
classLoader);
    +            mgmt.getRebindManager().setPersister(persisterObj);
    +            ha = new HighAvailabilityManagerImpl(mgmt)
    +                .setPollPeriod(Duration.PRACTICALLY_FOREVER)
    +                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
    +                .setLocalTicker(ticker)
    +                .setRemoteTicker(ticker)
    +                .setPersister(persister);
    +            log.info("Created "+nodeName+" "+ownNodeId);
    +        }
    +        
    +        public void tearDown() throws Exception {
    +            if (ha != null) ha.stop();
    +            if (mgmt != null) Entities.destroyAll(mgmt);
    +            if (objectStore != null) objectStore.deleteCompletely();
    +        }
    +        
    +        private long tickerCurrentMillis() {
    +            return ticker.read();
    +        }
    +        
    +        private long tickerAdvance(Duration duration) {
    +            if (sharedTime!=null)
    +                throw new IllegalStateException("Using shared ticker; cannot advance
private node clock");
    +            currentTime.addAndGet(duration.toMilliseconds());
    +            return tickerCurrentMillis();
    +        }
    +        
    +        @Override
    +        public String toString() {
    +            return nodeName+" "+ownNodeId;
    +        }
    +    }
    +    
    +    @BeforeMethod(alwaysRun=true)
    +    public void setUp() throws Exception {
    +        nodes.clear();
    +        sharedBackingStore.clear();
    +    }
    +    
    +    public HaMgmtNode newNode() throws Exception {
    +        HaMgmtNode node = new HaMgmtNode();
    +        node.setUp();
    +        nodes.add(node);
    +        return node;
    +    }
    +
    +    @AfterMethod(alwaysRun=true)
    +    public void tearDown() throws Exception {
    +        for (HaMgmtNode n: nodes)
    +            n.tearDown();
    +    }
    +
    +    private void sharedTickerAdvance(Duration duration) {
    +        if (sharedTime==null) {
    +            for (HaMgmtNode n: nodes)
    +                n.tickerAdvance(duration);
    +        } else {
    +            sharedTime.addAndGet(duration.toMilliseconds());
    +        }
    +    }
    +    
    +    private long sharedTickerCurrentMillis() {
    +        return sharedTime.get();
    +    }
    +    
    +    protected void useSharedTime() {
    +        if (!nodes.isEmpty())
    +            throw new IllegalStateException("shared time must be set up before any nodes
created");
    +        sharedTime = new AtomicLong(System.currentTimeMillis());
    +    }
    +
    +    protected ManagementContextInternal newLocalManagementContext() {
    +        return new LocalManagementContextForTests();
    +    }
    +
    +    protected PersistenceObjectStore newPersistenceObjectStore() {
    +        return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates);
    +    }
    +    
    +    @Test
    +    public void testIfNodeStopsBeingAbleToWrite() throws Exception {
    +        useSharedTime();
    +        HaMgmtNode n1 = newNode();
    +        HaMgmtNode n2 = newNode();
    +        
    +        n1.ha.start(HighAvailabilityMode.AUTO);
    +        ManagementPlaneSyncRecord memento1 = n1.ha.getManagementPlaneSyncState();
    +        
    +        log.info(n1+" HA: "+memento1);
    +        assertEquals(memento1.getMasterNodeId(), n1.ownNodeId);
    +        Long time0 = sharedTickerCurrentMillis();
    +        assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time0);
    +        assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
    +
    +        n2.ha.start(HighAvailabilityMode.AUTO);
    +        ManagementPlaneSyncRecord memento2 = n2.ha.getManagementPlaneSyncState();
    +        
    +        log.info(n2+" HA: "+memento2);
    +        assertEquals(memento2.getMasterNodeId(), n1.ownNodeId);
    +        assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
    +        assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.STANDBY);
    +        assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time0);
    +        assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
time0);
    +        
    +        // and no entities at either
    +        assertEquals(n1.mgmt.getApplications().size(), 0);
    +        assertEquals(n2.mgmt.getApplications().size(), 0);
    +
    +        // create
    +        TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class),
n1.mgmt);
    +        app.start(ImmutableList.<Location>of());
    +        app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello");
    +        
    +        assertEquals(n1.mgmt.getApplications().size(), 1);
    +        assertEquals(n2.mgmt.getApplications().size(), 0);
    +        n1.mgmt.getRebindManager().forcePersistNow();
    --- End diff --
    
    I'm fine with this, but note for other places...
    The more complicated `waitForWritesComplete` call ensures that tests don't only pass because
we're explicitly calling `persistNow`. Instead other tests are more passively observing what
the production persistence code is doing. So let's not change the other uses of `waitForWritesComplete`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message