brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [5/6] git commit: fixes multi-master problems by having one demote himself, with tests for demotion and for massively-concurrent startups also
Date Thu, 26 Jun 2014 11:28:00 GMT
fixes multi-master problems by having one demote himself, with tests for demotion and for massively-concurrent
startups also


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/49830cda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/49830cda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/49830cda

Branch: refs/heads/master
Commit: 49830cda57efd0ce4121f2595d50b38d24995fd4
Parents: 2ed4c77
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Thu Jun 26 01:24:19 2014 +0100
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Thu Jun 26 02:43:27 2014 +0100

----------------------------------------------------------------------
 .../brooklyn/entity/rebind/RebindManager.java   |   5 +
 .../rebind/PeriodicDeltaChangeListener.java     | 137 ++++++++--------
 .../entity/rebind/RebindManagerImpl.java        |   5 +
 .../ha/HighAvailabilityManagerImpl.java         |  83 +++++++---
 .../NonDeploymentManagementContext.java         |   5 +
 .../rebind/persister/InMemoryObjectStore.java   |   2 +-
 .../HighAvailabilityManagerSplitBrainTest.java  | 159 +++++++++++++------
 7 files changed, 263 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/49830cda/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/rebind/RebindManager.java b/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
index 779fee7..c390a46 100644
--- a/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
+++ b/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
@@ -60,6 +60,11 @@ public interface RebindManager {
     @VisibleForTesting
     @Deprecated
     public void waitForPendingComplete(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException;
+    /** waits for any needed or pending writes to complete */
     @VisibleForTesting
     public void waitForPendingComplete(Duration duration) throws InterruptedException, TimeoutException;
+    /** forcibly performs persistence, in the foreground */
+    @VisibleForTesting
+    public void forcePersistNow();
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/49830cda/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
index 6978e39..afb8565 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
@@ -2,6 +2,7 @@ package brooklyn.entity.rebind;
 
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -80,6 +81,8 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
     private final boolean persistPoliciesEnabled;
     private final boolean persistEnrichersEnabled;
     
+    private final Semaphore persistingMutex = new Semaphore(1);
+    
     public PeriodicDeltaChangeListener(ExecutionManager executionManager, BrooklynMementoPersister
persister, long periodMillis) {
         this.executionManager = executionManager;
         this.persister = persister;
@@ -89,6 +92,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
         this.persistEnrichersEnabled = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_ENRICHER_PERSISTENCE_PROPERTY);
     }
     
+    @SuppressWarnings("unchecked")
     public void start() {
         running = true;
         
@@ -172,77 +176,80 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
         return stopped || executionManager.isShutdown();
     }
     
-    private void persistNow() {
-        if (isActive()) {
-            try {
-                // Atomically switch the delta, so subsequent modifications will be done
in the
-                // next scheduled persist
-                DeltaCollector prevDeltaCollector;
-                synchronized (this) {
-                    prevDeltaCollector = deltaCollector;
-                    deltaCollector = new DeltaCollector();
-                }
-                
-                // Generate mementos for everything that has changed in this time period
-                if (prevDeltaCollector.isEmpty()) {
-                    if (LOG.isTraceEnabled()) LOG.trace("No changes to persist since last
delta");
-                } else {
-                    PersisterDeltaImpl persisterDelta = new PersisterDeltaImpl();
-                    for (Location location : prevDeltaCollector.locations) {
-                        try {
-                            persisterDelta.locations.add(((LocationInternal)location).getRebindSupport().getMemento());
-                        } catch (Exception e) {
-                            handleGenerateMementoException(e, "location "+location.getClass().getSimpleName()+"("+location.getId()+")");
-                        }
-                    }
-                    for (Entity entity : prevDeltaCollector.entities) {
-                        try {
-                            persisterDelta.entities.add(((EntityInternal)entity).getRebindSupport().getMemento());
-                        } catch (Exception e) {
-                            handleGenerateMementoException(e, "entity "+entity.getEntityType().getSimpleName()+"("+entity.getId()+")");
-                        }
+    @VisibleForTesting
+    public void persistNow() {
+        if (!isActive()) return;
+        try {
+            persistingMutex.acquire();
+            if (!isActive()) return;
+            // Atomically switch the delta, so subsequent modifications will be done in the
+            // next scheduled persist
+            DeltaCollector prevDeltaCollector;
+            synchronized (this) {
+                prevDeltaCollector = deltaCollector;
+                deltaCollector = new DeltaCollector();
+            }
+
+            // Generate mementos for everything that has changed in this time period
+            if (prevDeltaCollector.isEmpty()) {
+                if (LOG.isTraceEnabled()) LOG.trace("No changes to persist since last delta");
+            } else {
+                PersisterDeltaImpl persisterDelta = new PersisterDeltaImpl();
+                for (Location location : prevDeltaCollector.locations) {
+                    try {
+                        persisterDelta.locations.add(((LocationInternal)location).getRebindSupport().getMemento());
+                    } catch (Exception e) {
+                        handleGenerateMementoException(e, "location "+location.getClass().getSimpleName()+"("+location.getId()+")");
                     }
-                    for (Policy policy : prevDeltaCollector.policies) {
-                        try {
-                            persisterDelta.policies.add(policy.getRebindSupport().getMemento());
-                        } catch (Exception e) {
-                            handleGenerateMementoException(e, "policy "+policy.getClass().getSimpleName()+"("+policy.getId()+")");
-                        }
+                }
+                for (Entity entity : prevDeltaCollector.entities) {
+                    try {
+                        persisterDelta.entities.add(((EntityInternal)entity).getRebindSupport().getMemento());
+                    } catch (Exception e) {
+                        handleGenerateMementoException(e, "entity "+entity.getEntityType().getSimpleName()+"("+entity.getId()+")");
                     }
-                    for (Enricher enricher : prevDeltaCollector.enrichers) {
-                        try {
-                            persisterDelta.enrichers.add(enricher.getRebindSupport().getMemento());
-                        } catch (Exception e) {
-                            handleGenerateMementoException(e, "enricher "+enricher.getClass().getSimpleName()+"("+enricher.getId()+")");
-                        }
+                }
+                for (Policy policy : prevDeltaCollector.policies) {
+                    try {
+                        persisterDelta.policies.add(policy.getRebindSupport().getMemento());
+                    } catch (Exception e) {
+                        handleGenerateMementoException(e, "policy "+policy.getClass().getSimpleName()+"("+policy.getId()+")");
                     }
-                    persisterDelta.removedLocationIds = prevDeltaCollector.removedLocationIds;
-                    persisterDelta.removedEntityIds = prevDeltaCollector.removedEntityIds;
-                    persisterDelta.removedPolicyIds = prevDeltaCollector.removedPolicyIds;
-                    persisterDelta.removedEnricherIds = prevDeltaCollector.removedEnricherIds;
-                    
-                    /*
-                     * Need to guarantee "happens before", with any thread that subsequently
reads
-                     * the mementos.
-                     * 
-                     * See MementoFileWriter.writeNow for the corresponding synchronization,
-                     * that guarantees its thread has values visible for reads.
-                     */
-                    synchronized (new Object()) {}
-
-                    // Tell the persister to persist it
-                    persister.delta(persisterDelta);
                 }
-            } catch (Exception e) {
-                if (isActive()) {
-                    throw Exceptions.propagate(e);
-                } else {
-                    Exceptions.propagateIfFatal(e);
-                    LOG.debug("Problem persisting, but no longer active (ignoring)", e);
+                for (Enricher enricher : prevDeltaCollector.enrichers) {
+                    try {
+                        persisterDelta.enrichers.add(enricher.getRebindSupport().getMemento());
+                    } catch (Exception e) {
+                        handleGenerateMementoException(e, "enricher "+enricher.getClass().getSimpleName()+"("+enricher.getId()+")");
+                    }
                 }
-            } finally {
-                writeCount.incrementAndGet();
+                persisterDelta.removedLocationIds = prevDeltaCollector.removedLocationIds;
+                persisterDelta.removedEntityIds = prevDeltaCollector.removedEntityIds;
+                persisterDelta.removedPolicyIds = prevDeltaCollector.removedPolicyIds;
+                persisterDelta.removedEnricherIds = prevDeltaCollector.removedEnricherIds;
+
+                /*
+                 * Need to guarantee "happens before", with any thread that subsequently
reads
+                 * the mementos.
+                 * 
+                 * See MementoFileWriter.writeNow for the corresponding synchronization,
+                 * that guarantees its thread has values visible for reads.
+                 */
+                synchronized (new Object()) {}
+
+                // Tell the persister to persist it
+                persister.delta(persisterDelta);
+            }
+        } catch (Exception e) {
+            if (isActive()) {
+                throw Exceptions.propagate(e);
+            } else {
+                Exceptions.propagateIfFatal(e);
+                LOG.debug("Problem persisting, but no longer active (ignoring)", e);
             }
+        } finally {
+            writeCount.incrementAndGet();
+            persistingMutex.release();
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/49830cda/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index c9a0661..b1c5465 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -188,6 +188,11 @@ public class RebindManagerImpl implements RebindManager {
         realChangeListener.waitForPendingComplete(timeout);
         persister.waitForWritesCompleted(timeout);
     }
+    @Override
+    @VisibleForTesting
+    public void forcePersistNow() {
+        realChangeListener.persistNow();
+    }
     
     @Override
     public ChangeListener getChangeListener() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/49830cda/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
index 41cbe51..58b74d8 100644
--- a/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
+++ b/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
@@ -14,6 +14,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import brooklyn.BrooklynVersion;
+import brooklyn.entity.Application;
+import brooklyn.entity.basic.Entities;
 import brooklyn.entity.rebind.RebindManager;
 import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
 import brooklyn.entity.rebind.plane.dto.ManagementPlaneSyncRecordImpl;
@@ -71,11 +73,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
     
     // TODO There is a race if you start multiple nodes simultaneously.
     // They may not have seen each other's heartbeats yet, so will all claim mastery!
+    // But this should be resolved shortly afterwards.
 
-    // TODO Should detect if multiple active nodes believe that they are master (possibly
in MasterChooser?),
-    // and respond accordingly.
-    // Could support "demotingFromMaster" (e.g. restart management context?!).
-    
     // TODO Should we pass in a classloader on construction, so it can be passed to {@link
RebindManager#rebind(ClassLoader)} 
     
     public static interface PromotionListener {
@@ -194,7 +193,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
                 String masterNodeId = getManagementPlaneSyncState().getMasterNodeId();
                 ManagementNodeSyncRecord masterNodeDetails = getManagementPlaneSyncState().getManagementNodes().get(masterNodeId);
                 LOG.info("Management node "+ownNodeId+" started as HA STANDBY autodetected,
master is "+masterNodeId+
-                    (masterNodeDetails==null || masterNodeDetails.getUri()==null ? " (no
further info)" : " at "+masterNodeDetails.getUri()));
+                    (masterNodeDetails==null || masterNodeDetails.getUri()==null ? " (no
url)" : " at "+masterNodeDetails.getUri()));
             } else {
                 LOG.info("Management node "+ownNodeId+" started as HA MASTER autodetected");
             }
@@ -302,6 +301,11 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
      */
     protected synchronized void publishDemotionFromMasterOnFailure() {
         checkState(getNodeState() == ManagementNodeState.FAILED, "node status must be failed
on publish, but is %s", getNodeState());
+        publishDemotionFromMaster(true);
+    }
+    
+    protected synchronized void publishDemotionFromMaster(boolean clearMaster) {
+        checkState(getNodeState() != ManagementNodeState.MASTER, "node status must not be
master when demoting", getNodeState());
         
         if (persister == null) {
             LOG.info("Cannot publish management-node health as no persister");
@@ -309,10 +313,11 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
         }
         
         ManagementNodeSyncRecord memento = createManagementNodeSyncRecord(false);
-        Delta delta = ManagementPlaneSyncRecordDeltaImpl.builder()
-                .node(memento)
-                .clearMaster(ownNodeId)
-                .build();
+        ManagementPlaneSyncRecordDeltaImpl.Builder deltaBuilder = ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(memento);
+        if (clearMaster) deltaBuilder.clearMaster(ownNodeId);
+        
+        Delta delta = deltaBuilder.build();
         persister.delta(delta);
         if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento);
     }
@@ -377,10 +382,24 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
         ManagementNodeSyncRecord masterNodeMemento = memento.getManagementNodes().get(masterNodeId);
         ManagementNodeSyncRecord ownNodeMemento = memento.getManagementNodes().get(ownNodeId);
         
+        ManagementNodeSyncRecord newMasterRecord = null;
+        boolean demotingSelfInFavourOfOtherMaster = false;
+        
         if (masterNodeMemento != null && masterNodeMemento.getStatus() == ManagementNodeState.MASTER
&& isHeartbeatOk(masterNodeMemento, ownNodeMemento)) {
-            // master still seems healthy
-            if (LOG.isTraceEnabled()) LOG.trace("Existing master healthy: master={}", masterNodeMemento.toVerboseString());
-            return;
+            // master seems healthy
+            if (ownNodeId.equals(masterNodeId)) {
+                if (LOG.isTraceEnabled()) LOG.trace("Existing master healthy (us): master={}",
masterNodeMemento.toVerboseString());
+                return;
+            } else {
+                if (ownNodeMemento!=null && ownNodeMemento.getStatus() == ManagementNodeState.MASTER)
{
+                    LOG.error("HA subsystem detected change of master, stolen from us ("+ownNodeId+"),
deferring to "+masterNodeId);
+                    newMasterRecord = masterNodeMemento;
+                    demotingSelfInFavourOfOtherMaster = true;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("Existing master healthy (remote):
master={}", masterNodeMemento.toVerboseString());
+                    return;
+                }
+            }
         } else if (ownNodeMemento == null || !isHeartbeatOk(ownNodeMemento, ownNodeMemento))
{
             // our heartbeats are also out-of-date! perhaps something wrong with persistence?
just log, and don't over-react!
             if (ownNodeMemento == null) {
@@ -399,14 +418,21 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
             return;
         }
         
+        if (demotingSelfInFavourOfOtherMaster) {
+            LOG.debug("Master-change for this node only, demoting "+ownNodeMemento.toVerboseString()+"
in favour of official master "+newMasterRecord);
+            demoteToStandby();
+            return;
+        }
+        
         // Need to choose a new master
-        ManagementNodeSyncRecord newMasterRecord = masterChooser.choose(memento, heartbeatTimeout,
ownNodeId);
+        newMasterRecord = masterChooser.choose(memento, heartbeatTimeout, ownNodeId);
+        
         String newMasterNodeId = (newMasterRecord == null) ? null : newMasterRecord.getNodeId();
         URI newMasterNodeUri = (newMasterRecord == null) ? null : newMasterRecord.getUri();
         boolean newMasterIsSelf = ownNodeId.equals(newMasterNodeId);
         
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Management node master-promotion required: newMaster={}; oldMaster={};
plane={}, self={}; heartbeatTimeout={}", 
+            LOG.debug("Management node master-change required: newMaster={}; oldMaster={};
plane={}, self={}; heartbeatTimeout={}", 
                 new Object[] {
                     (newMasterRecord == null ? "<none>" : newMasterRecord.toVerboseString()),
                     (masterNodeMemento == null ? masterNodeId+" (no memento)": masterNodeMemento.toVerboseString()),
@@ -416,13 +442,15 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
                 });
         }
         if (!initializing) {
-            LOG.warn("HA subsystem detected change of master from " 
-                + masterNodeId + " (" + (masterNodeMemento==null ? "?" : masterNodeMemento.getRemoteTimestamp())
+ ")"
-                + " to "
-                + (newMasterNodeId == null ? "<none>" :
-                    newMasterNodeId + " (" + newMasterRecord.getRemoteTimestamp() + ")" 
-                    + (newMasterNodeUri!=null ? " "+newMasterNodeUri : "") 
-                + (newMasterIsSelf ? " [this]" : "") ));
+            if (!demotingSelfInFavourOfOtherMaster) {
+                LOG.warn("HA subsystem detected change of master, from " 
+                    + masterNodeId + " (" + (masterNodeMemento==null ? "?" : masterNodeMemento.getRemoteTimestamp())
+ ")"
+                    + " to "
+                    + (newMasterNodeId == null ? "<none>" :
+                        (newMasterIsSelf ? "us " : "")
+                        + newMasterNodeId + " (" + newMasterRecord.getRemoteTimestamp() +
")" 
+                        + (newMasterNodeUri!=null ? " "+newMasterNodeUri : "")  ));
+            }
         }
 
         // New master is ourself: promote
@@ -458,6 +486,19 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
         managementContext.getRebindManager().start();
     }
 
+    protected void demoteToStandby() {
+        if (!running) {
+            LOG.warn("Ignoring demote-from-master request, as HighAvailabilityManager is
no longer running");
+            return;
+        }
+
+        nodeState = ManagementNodeState.STANDBY;
+        managementContext.getRebindManager().stop();
+        for (Application app: managementContext.getApplications())
+            Entities.unmanage(app);
+        publishDemotionFromMaster(false);
+    }
+
     /**
      * @param reportCleanedState - if true, the record for this mgmt node will be replaced
with the
      * actual current status known in this JVM (may be more recent than what is persisted);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/49830cda/core/src/main/java/brooklyn/management/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/NonDeploymentManagementContext.java
b/core/src/main/java/brooklyn/management/internal/NonDeploymentManagementContext.java
index cbe36d1..d56b412 100644
--- a/core/src/main/java/brooklyn/management/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/brooklyn/management/internal/NonDeploymentManagementContext.java
@@ -417,9 +417,14 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
         public void waitForPendingComplete(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException {
             throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+"
is not valid for this operation.");
         }
+        @Override
         public void waitForPendingComplete(Duration timeout) throws InterruptedException,
TimeoutException {
             throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+"
is not valid for this operation.");
         }
+        @Override
+        public void forcePersistNow() {
+            throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+"
is not valid for this operation.");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/49830cda/core/src/test/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
b/core/src/test/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
index 5ffadea..fec8e94 100644
--- a/core/src/test/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
+++ b/core/src/test/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
@@ -29,7 +29,7 @@ public class InMemoryObjectStore implements PersistenceObjectStore {
     public InMemoryObjectStore(Map<String,String> map, Map<String, Date> fileModTimesByName)
{
         filesByName = map;
         this.fileModTimesByName = fileModTimesByName;
-        log.info("Using memory-based objectStore");
+        log.debug("Using memory-based objectStore");
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/49830cda/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
index 46fec19..0810f02 100644
--- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
+++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
@@ -5,28 +5,36 @@ import static org.testng.Assert.assertEquals;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import brooklyn.entity.basic.ApplicationBuilder;
 import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
 import brooklyn.entity.rebind.persister.InMemoryObjectStore;
 import brooklyn.entity.rebind.persister.ListeningObjectStore;
 import brooklyn.entity.rebind.persister.PersistMode;
 import brooklyn.entity.rebind.persister.PersistenceObjectStore;
-import brooklyn.management.ha.HighAvailabilityManagerTestFixture.RecordingPromotionListener;
+import brooklyn.location.Location;
 import brooklyn.management.internal.ManagementContextInternal;
 import brooklyn.test.entity.LocalManagementContextForTests;
+import brooklyn.test.entity.TestApplication;
 import brooklyn.util.collections.MutableList;
 import brooklyn.util.collections.MutableMap;
+import brooklyn.util.repeat.Repeater;
 import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
 
 import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
 
 @Test
 public class HighAvailabilityManagerSplitBrainTest {
@@ -37,19 +45,18 @@ public class HighAvailabilityManagerSplitBrainTest {
     Map<String,String> sharedBackingStore = MutableMap.of();
     Map<String,Date> sharedBackingStoreDates = MutableMap.of();
     private AtomicLong sharedTime; // used to set the ticker's return value
+    private ClassLoader classLoader = getClass().getClassLoader();
     
     public class HaMgmtNode {
         
-        private ManagementPlaneSyncRecordPersister persister;
-        private ManagementContextInternal managementContext;
+        private ManagementContextInternal mgmt;
         private String ownNodeId;
-        private HighAvailabilityManagerImpl manager;
+        private String nodeName;
+        private ListeningObjectStore objectStore;
+        private ManagementPlaneSyncRecordPersister persister;
+        private HighAvailabilityManagerImpl ha;
         private Ticker ticker;
         private AtomicLong currentTime; // used to set the ticker's return value
-        private RecordingPromotionListener promotionListener;
-        private ClassLoader classLoader = getClass().getClassLoader();
-        private ListeningObjectStore objectStore;
-        private String nodeName;
 
         @BeforeMethod(alwaysRun=true)
         public void setUp() throws Exception {
@@ -65,20 +72,18 @@ public class HighAvailabilityManagerSplitBrainTest {
             };
             
             nodeName = "node "+nodes.size();
-            promotionListener = new RecordingPromotionListener();
-            managementContext = newLocalManagementContext();
-            ownNodeId = managementContext.getManagementNodeId();
+            mgmt = newLocalManagementContext();
+            ownNodeId = mgmt.getManagementNodeId();
             objectStore = new ListeningObjectStore(newPersistenceObjectStore());
-            objectStore.injectManagementContext(managementContext);
+            objectStore.injectManagementContext(mgmt);
             objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
-            persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext,
objectStore, classLoader);
+            persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore,
classLoader);
             ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
             BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore,
classLoader);
-            managementContext.getRebindManager().setPersister(persisterObj);
-            manager = new HighAvailabilityManagerImpl(managementContext)
+            mgmt.getRebindManager().setPersister(persisterObj);
+            ha = new HighAvailabilityManagerImpl(mgmt)
                 .setPollPeriod(Duration.PRACTICALLY_FOREVER)
                 .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
-                .setPromotionListener(promotionListener)
                 .setLocalTicker(ticker)
                 .setRemoteTicker(ticker)
                 .setPersister(persister);
@@ -86,8 +91,8 @@ public class HighAvailabilityManagerSplitBrainTest {
         }
         
         public void tearDown() throws Exception {
-            if (manager != null) manager.stop();
-            if (managementContext != null) Entities.destroyAll(managementContext);
+            if (ha != null) ha.stop();
+            if (mgmt != null) Entities.destroyAll(mgmt);
             if (objectStore != null) objectStore.deleteCompletely();
         }
         
@@ -155,13 +160,13 @@ public class HighAvailabilityManagerSplitBrainTest {
     }
     
     @Test
-    public void testTwoNodes() throws Exception {
+    public void testIfNodeStopsBeingAbleToWrite() throws Exception {
         useSharedTime();
         HaMgmtNode n1 = newNode();
         HaMgmtNode n2 = newNode();
         
-        n1.manager.start(HighAvailabilityMode.AUTO);
-        ManagementPlaneSyncRecord memento1 = n1.manager.getManagementPlaneSyncState();
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        ManagementPlaneSyncRecord memento1 = n1.ha.getManagementPlaneSyncState();
         
         log.info(n1+" HA: "+memento1);
         assertEquals(memento1.getMasterNodeId(), n1.ownNodeId);
@@ -169,8 +174,8 @@ public class HighAvailabilityManagerSplitBrainTest {
         assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time0);
         assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
 
-        n2.manager.start(HighAvailabilityMode.AUTO);
-        ManagementPlaneSyncRecord memento2 = n2.manager.getManagementPlaneSyncState();
+        n2.ha.start(HighAvailabilityMode.AUTO);
+        ManagementPlaneSyncRecord memento2 = n2.ha.getManagementPlaneSyncState();
         
         log.info(n2+" HA: "+memento2);
         assertEquals(memento2.getMasterNodeId(), n1.ownNodeId);
@@ -179,12 +184,25 @@ public class HighAvailabilityManagerSplitBrainTest {
         assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time0);
         assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
time0);
         
+        // and no entities at either
+        assertEquals(n1.mgmt.getApplications().size(), 0);
+        assertEquals(n2.mgmt.getApplications().size(), 0);
+
+        // create
+        TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class),
n1.mgmt);
+        app.start(ImmutableList.<Location>of());
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello");
+        
+        assertEquals(n1.mgmt.getApplications().size(), 1);
+        assertEquals(n2.mgmt.getApplications().size(), 0);
+        n1.mgmt.getRebindManager().forcePersistNow();
+        
         n1.objectStore.setWritesFailSilently(true);
         log.info(n1+" writes off");
         sharedTickerAdvance(Duration.ONE_MINUTE);
         
-        n2.manager.publishAndCheck(false);
-        ManagementPlaneSyncRecord memento2b = n2.manager.getManagementPlaneSyncState();
+        n2.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento2b = n2.ha.getManagementPlaneSyncState();
         log.info(n2+" HA now: "+memento2b);
         Long time1 = sharedTickerCurrentMillis();
         
@@ -195,34 +213,20 @@ public class HighAvailabilityManagerSplitBrainTest {
         assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time0);
         assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
time1);
         
+        assertEquals(n1.mgmt.getApplications().size(), 1);
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        assertEquals(n1.mgmt.getApplications().iterator().next().getAttribute(TestApplication.MY_ATTRIBUTE),
"hello");
+        
         n1.objectStore.setWritesFailSilently(false);
         log.info(n1+" writes on");
         
         sharedTickerAdvance(Duration.ONE_SECOND);
         Long time2 = sharedTickerCurrentMillis();
         
-        n1.manager.publishAndCheck(false);
-        ManagementPlaneSyncRecord memento1b = n1.manager.getManagementPlaneSyncState();
+        n1.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento1b = n1.ha.getManagementPlaneSyncState();
         log.info(n1+" HA now: "+memento1b);
         
-//        // n1 comes back and sees himself as master, but with both masters 
-//        assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
-//        assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
-//        assertEquals(memento1b.getMasterNodeId(), n1.ownNodeId);
-//        assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time2);
-//        assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
time1);
-//        
-//        // n2 sees itself as master, but again with both masters
-//        ManagementPlaneSyncRecord memento2c = n2.manager.getManagementPlaneSyncState();
-//        log.info(n2+" HA now: "+memento2c);
-//        assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
-//        assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
-//        assertEquals(memento2c.getMasterNodeId(), n2.ownNodeId);
-//        assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time2);
-//        assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
time2);
-
-        // current (unwanted) state is above, desired state below
-        
         // n1 comes back and demotes himself 
         assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.STANDBY);
         assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
@@ -231,7 +235,7 @@ public class HighAvailabilityManagerSplitBrainTest {
         assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
time1);
         
         // n2 now sees itself as master, with n1 in standby again
-        ManagementPlaneSyncRecord memento2c = n2.manager.getManagementPlaneSyncState();
+        ManagementPlaneSyncRecord memento2c = n2.ha.getManagementPlaneSyncState();
         log.info(n2+" HA now: "+memento2c);
         assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.STANDBY);
         assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
@@ -239,6 +243,69 @@ public class HighAvailabilityManagerSplitBrainTest {
         assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(),
time2);
         assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(),
time2);
 
+        // and no entities at n1
+        assertEquals(n1.mgmt.getApplications().size(), 0);
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+    }
+    
+    @Test(invocationCount=50)
+    public void testIfNodeStopsBeingAbleToWriteManyTimes() throws Exception {
+        testIfNodeStopsBeingAbleToWrite();
+    }
+    
+    @Test
+    public void testSimultaneousStartup() throws Exception {
+        doTestConcurrentStartup(5, null);
+    }
+
+    @Test
+    public void testNearSimultaneousStartup() throws Exception {
+        doTestConcurrentStartup(20, Duration.millis(20));
+    }
+
+    @Test(invocationCount=50, groups="Integration")
+    public void testNearSimultaneousStartupManyTimes() throws Exception {
+        doTestConcurrentStartup(20, Duration.millis(20));
+    }
+
+    protected void doTestConcurrentStartup(int size, final Duration staggerStart) throws
Exception {
+        useSharedTime();
+        
+        List<Thread> spawned = MutableList.of();
+        for (int i=0; i<size; i++) {
+            final HaMgmtNode n = newNode();
+            Thread t = new Thread() { public void run() {
+                if (staggerStart!=null) Time.sleep(staggerStart.multiply(Math.random()));
+                n.ha.start(HighAvailabilityMode.AUTO);
+            } };
+            spawned.add(t);
+            t.start();
+        }
+        
+        Assert.assertTrue(Repeater.create().every(Duration.millis(1)).limitTimeTo(Duration.TEN_SECONDS).until(new
Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                ManagementPlaneSyncRecord memento = nodes.get(0).ha.getManagementPlaneSyncState();
+                int masters=0, standbys=0, savedMasters=0, savedStandbys=0;
+                for (HaMgmtNode n: nodes) {
+                    if (n.ha.getNodeState()==ManagementNodeState.MASTER) masters++;
+                    if (n.ha.getNodeState()==ManagementNodeState.STANDBY) standbys++;
+                    ManagementNodeSyncRecord m = memento.getManagementNodes().get(n.ownNodeId);
+                    if (m!=null) {
+                        if (m.getStatus()==ManagementNodeState.MASTER) savedMasters++;
+                        if (m.getStatus()==ManagementNodeState.STANDBY) savedStandbys++;
+                    }
+                }
+                log.info("starting "+nodes.size()+" nodes: "+masters+" M + "+standbys+" zzz;
"
+                    + memento.getManagementNodes().size()+" saved, "
+                        + memento.getMasterNodeId()+" master, "+savedMasters+" M + "+savedStandbys+"
zzz");
+                
+                return masters==1 && standbys==nodes.size()-1 && savedMasters==1
&& savedStandbys==nodes.size()-1;
+            }
+        }).run());
+        
+        for (Thread t: spawned)
+            t.join(Duration.FIVE_SECONDS.toMilliseconds());
     }
     
+
 }


Mime
View raw message