qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1560094 [1/2] - in /qpid/branches/java-broker-bdb-ha/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/src/main/java/org/apache/q...
Date Tue, 21 Jan 2014 16:57:19 GMT
Author: kwall
Date: Tue Jan 21 16:57:19 2014
New Revision: 1560094

URL: http://svn.apache.org/r1560094
Log:
QPID-5409: Add functionality to automatically detect that master is isolated from majority and restart environment.

Added:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java
      - copied, changed from r1559960, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestStateChangeListener.java
      - copied, changed from r1559960, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/resources/log4j.properties
Removed:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeTestCase.java
Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Tue Jan 21 16:57:19 2014
@@ -221,8 +221,6 @@ public class BDBHAVirtualHost extends Ab
 
     private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
     {
-        // TODO shutdown the executor
-        private final Executor _executor = Executors.newSingleThreadExecutor();
 
         @Override
         public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
@@ -231,20 +229,21 @@ public class BDBHAVirtualHost extends Ab
 
             if (LOGGER.isInfoEnabled())
             {
-                LOGGER.info("Received BDB event indicating transition to state " + state);
+                LOGGER.info("Received BDB event indicating transition to state " + state
+                        + " when current message store state is " + _messageStore._stateManager.getState());
             }
 
             switch (state)
             {
             case MASTER:
-                activateStoreAsync();
+                activate();
                 break;
             case REPLICA:
-                passivateStoreAsync();
+                passivate();
                 break;
             case DETACHED:
                 LOGGER.error("BDB replicated node in detached state, therefore passivating.");
-                passivateStoreAsync();
+                passivate();
                 break;
             case UNKNOWN:
                 LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
@@ -255,102 +254,33 @@ public class BDBHAVirtualHost extends Ab
             }
         }
 
-        /**
-         * Calls {@link MessageStore#activate()}.
-         *
-         * <p/>
-         *
-         * This is done a background thread, in line with
-         * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
-         * activate may execute transactions, which can't complete until
-         * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
-         */
-        private void activateStoreAsync()
+        private void activate()
         {
-            String threadName = "BDBHANodeActivationThread-" + getName();
-            executeStateChangeAsync(new Callable<Void>()
+            try
             {
-                @Override
-                public Void call() throws Exception
-                {
-                    try
-                    {
-                        _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
-                        _messageStore.activate();
-                    }
-                    catch (Exception e)
-                    {
-                        LOGGER.error("Failed to activate on hearing MASTER change event", e);
-                    }
-                    return null;
-                }
-            }, threadName);
-        }
-
-        private void passivateStoreAsync()
-        {
-            String threadName = "BDBHANodePassivationThread-" + getName();
-            executeStateChangeAsync(new Callable<Void>()
+                _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+                _messageStore.activate();
+            }
+            catch (Exception e)
             {
-
-                @Override
-                public Void call() throws Exception
-                {
-                    try
-                    {
-                        if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
-                        {
-                            LOGGER.debug("Store becoming passive");
-                            _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
-                        }
-                    }
-                    catch (Exception e)
-                    {
-                        LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
-                    }
-                    return null;
-                }
-            }, threadName);
+                LOGGER.error("Failed to activate on hearing MASTER change event", e);
+            }
         }
 
-        private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
+        private void passivate()
         {
-            final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
-
-            _executor.execute(new Runnable()
+            try
             {
-
-                @Override
-                public void run()
+                if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
                 {
-                    final String originalThreadName = Thread.currentThread().getName();
-                    Thread.currentThread().setName(threadName);
-                    try
-                    {
-                        CurrentActor.set(new AbstractActor(_rootLogger)
-                        {
-                            @Override
-                            public String getLogMessage()
-                            {
-                                return threadName;
-                            }
-                        });
-
-                        try
-                        {
-                            callable.call();
-                        }
-                        catch (Exception e)
-                        {
-                            LOGGER.error("Exception during state change", e);
-                        }
-                    }
-                    finally
-                    {
-                        Thread.currentThread().setName(originalThreadName);
-                    }
+                    _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
                 }
-            });
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
+            }
         }
+
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Tue Jan 21 16:57:19 2014
@@ -169,7 +169,7 @@ public class BDBMessageStore implements 
         try
         {
             new Upgrader(_environmentFacade.getEnvironment(), _virtualHost.getName()).upgradeIfNecessary();
-            _environmentFacade.openDatabases(DATABASE_NAMES, dbConfig);
+            _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES);
             _totalStoreSize = getSizeOnDisk();
         }
         catch(DatabaseException e)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Tue Jan 21 16:57:19 2014
@@ -49,7 +49,7 @@ public interface EnvironmentFacade
 
     AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e);
 
-    void openDatabases(String[] databaseNames, DatabaseConfig dbConfig) throws AMQStoreException;
+    void openDatabases(DatabaseConfig dbConfig, String... databaseNames);
 
     void close();
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Tue Jan 21 16:57:19 2014
@@ -54,6 +54,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.replication.ReplicationGroupListener;
 import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
 import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -129,10 +130,11 @@ public class ReplicatedEnvironmentFacade
 
     public static final String TYPE = "BDB-HA";
 
-    // TODO: get rid of these names
+    // TODO: JMX will change to observe the model, at that point these names will disappear
     public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
     public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
 
+    private final String _prettyGroupNodeName;
     private final String _groupName;
     private final String _nodeName;
     private final String _nodeHostPort;
@@ -147,23 +149,23 @@ public class ReplicatedEnvironmentFacade
     private final ExecutorService _restartEnvironmentExecutor;
     private final ScheduledExecutorService _groupChangeExecutor;
     private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
-    private final ConcurrentMap<String, Database> _databases = new ConcurrentHashMap<String, Database>();
+    private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
     private final ConcurrentMap<String, RemoteReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, RemoteReplicationNode>();
     private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
 
+    private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
+    private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
     private volatile CommitThreadWrapper _commitThreadWrapper;
-    private volatile StateChangeListener _stateChangeListener;
     private volatile ReplicatedEnvironment _environment;
-    private ReplicationGroupListener _replicationGroupListener;
     private long _joinTime;
     private String _lastKnownReplicationTransactionId;
 
     @SuppressWarnings("unchecked")
-    public ReplicatedEnvironmentFacade(String name, String environmentPath,
+    public ReplicatedEnvironmentFacade(String virtualHostName, String environmentPath,
             org.apache.qpid.server.model.ReplicationNode replicationNode,
             RemoteReplicationNodeFactory remoteReplicationNodeFactory)
     {
-         _name = name;
+         _name = virtualHostName;
         _environmentPath = environmentPath;
         _groupName = (String)replicationNode.getAttribute(GROUP_NAME);
         _nodeName = replicationNode.getName();
@@ -174,15 +176,49 @@ public class ReplicatedEnvironmentFacade
         _coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC);
         _environmentParameters = (Map<String, String>)replicationNode.getAttribute(PARAMETERS);
         _replicationEnvironmentParameters = (Map<String, String>)replicationNode.getAttribute(REPLICATION_PARAMETERS);
+        _prettyGroupNodeName = _groupName + ":" + _nodeName;
 
-        _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Restarter:" + _groupName));
-        _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _groupName));
+        _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName));
+        _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
 
         _remoteReplicationNodeFactory = remoteReplicationNodeFactory;
         _state.set(State.OPENING);
         _groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
         _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS);
-        _environment = createEnvironment();
+
+        // create environment in a separate thread to avoid renaming of the current thread by JE
+        Future<ReplicatedEnvironment> environmentFuture = _restartEnvironmentExecutor.submit(new Callable<ReplicatedEnvironment>(){
+            @Override
+            public ReplicatedEnvironment call() throws Exception
+            {
+                String originalThreadName = Thread.currentThread().getName();
+                try
+                {
+                    return createEnvironment();
+                }
+                finally
+                {
+                    Thread.currentThread().setName(originalThreadName);
+                }
+            }});
+
+        // TODO: evaluate the future timeout from JE ENVIRONMENT_SETUP
+        try
+        {
+            _environment = environmentFuture.get(15 * 2, TimeUnit.MINUTES);
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException("Unexpected exception on environment creation", e.getCause());
+        }
+        catch (TimeoutException e)
+        {
+            throw new RuntimeException("JE environment has not been created in due time");
+        }
         populateExistingRemoteReplicationNodes();
         _commitThreadWrapper = startCommitThread(_name, _environment);
     }
@@ -221,9 +257,9 @@ public class ReplicatedEnvironmentFacade
         {
             try
             {
-                LOGGER.debug("Closing replicated environment facade");
-                _restartEnvironmentExecutor.shutdownNow();
-                _groupChangeExecutor.shutdownNow();
+                LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
+                _restartEnvironmentExecutor.shutdown();
+                _groupChangeExecutor.shutdown();
                 stopCommitThread();
                 closeDatabases();
                 closeEnvironment();
@@ -273,23 +309,59 @@ public class ReplicatedEnvironmentFacade
     }
 
     @Override
-    public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig) throws AMQStoreException
+    public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
     {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not in opened state");
+        }
+
+        if (!_environment.isValid())
+        {
+            throw new IllegalStateException("Environment is not valid");
+        }
+
+        if (_environment.getState() != ReplicatedEnvironment.State.MASTER)
+        {
+            throw new IllegalStateException("Databases can only be opened on Master node");
+        }
+
+        for (String databaseName : databaseNames)
+        {
+            _databases.put(databaseName, new DatabaseHolder(dbConfig));
+        }
         for (String databaseName : databaseNames)
         {
-            Database database = _environment.openDatabase(null, databaseName, dbConfig);
-            _databases.put(databaseName, database);
+            DatabaseHolder holder = _databases.get(databaseName);
+            openDatabaseInternally(databaseName, holder);
         }
     }
 
+    private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
+    {
+        LOGGER.debug("Opening database " + databaseName + " on " + _prettyGroupNodeName);
+        Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
+        holder.setDatabase(database);
+    }
+
     @Override
     public Database getOpenDatabase(String name)
     {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not in opened state");
+        }
+
         if (!_environment.isValid())
         {
             throw new IllegalStateException("Environment is not valid");
         }
-        Database database = _databases.get(name);
+        DatabaseHolder databaseHolder = _databases.get(name);
+        if (databaseHolder == null)
+        {
+            throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened");
+        }
+        Database database = databaseHolder.getDatabase();
         if (database == null)
         {
             throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
@@ -298,32 +370,67 @@ public class ReplicatedEnvironmentFacade
     }
 
     @Override
-    public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+    public void stateChange(final StateChangeEvent stateChangeEvent)
+    {
+        _groupChangeExecutor.submit(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                stateChanged(stateChangeEvent);
+            }
+        });
+    }
+
+    private void stateChanged(StateChangeEvent stateChangeEvent)
     {
         ReplicatedEnvironment.State state = stateChangeEvent.getState();
-        LOGGER.info("The node state is " + state);
+        LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + state);
         if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
         {
             if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
             {
-                LOGGER.info("The environment facade is in open state");
+                LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
                 _joinTime = System.currentTimeMillis();
             }
         }
-        if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+
+        if (state == ReplicatedEnvironment.State.MASTER)
         {
-            StateChangeListener listener = _stateChangeListener;
+            reopenDatabases();
+            _commitThreadWrapper = startCommitThread(_name, _environment);
+            StateChangeListener listener = _stateChangeListener.get();
+            LOGGER.debug("Application state change listener " + listener);
             if (listener != null)
             {
                 listener.stateChange(stateChangeEvent);
             }
         }
+        else
+        {
+            if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+            {
+                StateChangeListener listener = _stateChangeListener.get();
+                if (listener != null)
+                {
+                    listener.stateChange(stateChangeEvent);
+                }
+            }
+        }
     }
 
-    public void setStateChangeListener(StateChangeListener listener)
+    private void reopenDatabases()
     {
-        _stateChangeListener = listener;
-        _environment.setStateChangeListener(this);
+        DatabaseConfig pingDbConfig = new DatabaseConfig();
+        pingDbConfig.setTransactional(true);
+        pingDbConfig.setAllowCreate(true);
+
+        _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
+
+        for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+        {
+            openDatabaseInternally(entry.getKey(), entry.getValue());
+        }
     }
 
     public String getName()
@@ -420,7 +527,7 @@ public class ReplicatedEnvironmentFacade
 
             if (LOGGER.isInfoEnabled())
             {
-                LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+                LOGGER.info("Node " + _prettyGroupNodeName + " successfully set as designated primary for group");
             }
 
         }
@@ -483,19 +590,27 @@ public class ReplicatedEnvironmentFacade
         return _state.get();
     }
 
-    /**
-     * Sets the replication group listener.  Whenever a new listener is set, the listener
-     * will hear {@link ReplicationGroupListener#onReplicationNodeRecovered(org.apache.qpid.server.model.ReplicationNode)
-     * for every existing remote node.
-     *
-     * @param replicationGroupListener listener
-     */
     public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener)
     {
-        _replicationGroupListener = replicationGroupListener;
-        if (_replicationGroupListener != null)
+        if (_replicationGroupListener.compareAndSet(null, replicationGroupListener))
+        {
+            notifyExistingRemoteReplicationNodes(replicationGroupListener);
+        }
+        else
+        {
+            throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName);
+        }
+    }
+
+    public void setStateChangeListener(StateChangeListener stateChangeListener)
+    {
+        if (_stateChangeListener.compareAndSet(null, stateChangeListener))
         {
-            notifyExistingRemoteReplicationNodes(_replicationGroupListener);
+            _environment.setStateChangeListener(this);
+        }
+        else
+        {
+            throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName);
         }
     }
 
@@ -604,22 +719,16 @@ public class ReplicatedEnvironmentFacade
 
         stopCommitThread(dbe);
 
-        Set<String> databaseNames = new HashSet<String>(_databases.keySet());
         closeEnvironmentSafely();
 
         _environment = createEnvironment();
 
-        DatabaseConfig dbConfig = new DatabaseConfig();
-        dbConfig.setTransactional(true);
-        // TODO Alex and I think this should be removed.
-        openDatabases(databaseNames.toArray(new String[databaseNames.size()]), dbConfig);
-
-        _commitThreadWrapper = startCommitThread(_name, _environment);
-
-        _environment.setStateChangeListener(this);
+        if (_stateChangeListener.get() != null)
+        {
+            _environment.setStateChangeListener(this);
+        }
 
         LOGGER.info("Environment is restarted");
-
     }
 
     private void closeEnvironmentSafely()
@@ -652,21 +761,32 @@ public class ReplicatedEnvironmentFacade
     private void closeDatabases()
     {
         RuntimeException firstThrownException = null;
-        for (Database database : _databases.values())
+        LOGGER.debug("Closing databases " + _databases);
+        for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
         {
-            try
-            {
-                database.close();
-            }
-            catch(RuntimeException e)
+            DatabaseHolder databaseHolder = entry.getValue();
+            Database database = databaseHolder.getDatabase();
+            if (database != null)
             {
-                if (firstThrownException == null)
+                try
+                {
+                    LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
+                    database.close();
+                }
+                catch(RuntimeException e)
+                {
+                    LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e);
+                    if (firstThrownException == null)
+                    {
+                        firstThrownException = e;
+                    }
+                }
+                finally
                 {
-                    firstThrownException = e;
+                    databaseHolder.setDatabase(null);
                 }
             }
         }
-        _databases.clear();
         if (firstThrownException != null)
         {
             throw firstThrownException;
@@ -756,7 +876,7 @@ public class ReplicatedEnvironmentFacade
             }
 
             ReplicatedEnvironment env = _environment;
-            ReplicationGroupListener replicationGroupListener = _replicationGroupListener;
+            ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
             if (env != null && env.isValid())
             {
                 ReplicationGroup group = env.getGroup();
@@ -814,6 +934,7 @@ public class ReplicatedEnvironmentFacade
     //TODO: move the class into external class
     private class RemoteNodeStateLearner implements Callable<Void>
     {
+        private Map<String, String> _previousGroupState = Collections.emptyMap();
         @Override
         public Void call()
         {
@@ -821,9 +942,8 @@ public class ReplicatedEnvironmentFacade
             try
             {
                 Set<Future<Void>> futures = new HashSet<Future<Void>>();
-                for (Map.Entry<String, RemoteReplicationNode> entry : _remoteReplicationNodes.entrySet())
+                for (final RemoteReplicationNode node : _remoteReplicationNodes.values())
                 {
-                    final RemoteReplicationNode  node = entry.getValue();
                     Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
                     {
                         @Override
@@ -856,6 +976,21 @@ public class ReplicatedEnvironmentFacade
                         future.cancel(true);
                     }
                 }
+
+                if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+                {
+                    Map<String, String> currentGroupState = new HashMap<String, String>();
+                    for (final RemoteReplicationNode node : _remoteReplicationNodes.values())
+                    {
+                        currentGroupState.put(node.getName(), (String)node.getAttribute(org.apache.qpid.server.model.ReplicationNode.ROLE));
+                    }
+                    boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+                    _previousGroupState = currentGroupState;
+                    if (stateChanged && State.OPEN == _state.get())
+                    {
+                        new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+                    }
+                }
             }
             finally
             {
@@ -876,7 +1011,7 @@ public class ReplicatedEnvironmentFacade
 
     public static enum State
     {
-        INITIAL,
+        INITIAL,  // TODO unused remove
         OPENING,
         OPEN,
         RESTARTING,
@@ -884,4 +1019,36 @@ public class ReplicatedEnvironmentFacade
         CLOSED
     }
 
+    private static class DatabaseHolder
+    {
+        private final DatabaseConfig _config;
+        private Database _database;
+
+        public DatabaseHolder(DatabaseConfig config)
+        {
+            _config = config;
+        }
+
+        public Database getDatabase()
+        {
+            return _database;
+        }
+
+        public void setDatabase(Database database)
+        {
+            _database = database;
+        }
+
+        public DatabaseConfig getConfig()
+        {
+            return _config;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]";
+        }
+
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Tue Jan 21 16:57:19 2014
@@ -25,9 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.model.ReplicationNode;
-import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
@@ -38,6 +36,7 @@ import com.sleepycat.je.Durability.SyncP
 
 public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
 {
+
     @Override
     public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
     {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Jan 21 16:57:19 2014
@@ -234,7 +234,7 @@ public class StandardEnvironmentFacade i
     }
 
     @Override
-    public void openDatabases(String[] databaseNames, DatabaseConfig dbConfig)
+    public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
     {
         for (String databaseName : databaseNames)
         {

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java?rev=1560094&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java Tue Jan 21 16:57:19 2014
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+public class DatabasePinger
+{
+    public static final String PING_DATABASE_NAME = "PINGDB";
+    private static final int ID = 0;
+
+
+    public void pingDb(EnvironmentFacade facade)
+    {
+        try
+        {
+            final Database db = facade.getOpenDatabase(PING_DATABASE_NAME);
+
+            DatabaseEntry key = new DatabaseEntry();
+            IntegerBinding.intToEntry(ID, key);
+
+            DatabaseEntry value = new DatabaseEntry();
+            LongBinding.longToEntry(System.currentTimeMillis(), value);
+            Transaction txn = null;
+            try
+            {
+                txn = facade.getEnvironment().beginTransaction(null, null);
+                db.put(txn, key, value);
+                txn.commit();
+                txn = null;
+            }
+            finally
+            {
+                try
+                {
+                    if (txn != null)
+                    {
+                        txn.abort();
+                    }
+                }
+                finally
+                {
+                    db.close();
+                }
+            }
+        }
+        catch (DatabaseException de)
+        {
+            facade.handleDatabaseException("DatabaseException from DatabasePinger ", de);
+        }
+    }
+}

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java Tue Jan 21 16:57:19 2014
@@ -223,12 +223,12 @@ public class RemoteReplicationNode exten
         catch (IOException e)
         {
             _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
-            LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e);
+            //LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e);
         }
         catch (ServiceConnectFailedException e)
         {
             _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
-            LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e);
+            //LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e);
         }
 
         if (!_role.equals(oldRole))

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java Tue Jan 21 16:57:19 2014
@@ -21,9 +21,11 @@
 package org.apache.qpid.server.store.berkeleydb.upgrade;
 
 import com.sleepycat.je.Cursor;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
 
@@ -38,6 +40,8 @@ import com.sleepycat.je.OperationStatus;
 
 public class Upgrader
 {
+    private static final Logger LOGGER = Logger.getLogger(Upgrader.class);
+
     static final String VERSION_DB_NAME = "DB_VERSION";
 
     private Environment _environment;
@@ -63,6 +67,7 @@ public class Upgrader
 
             if(versionDb.count() == 0L)
             {
+
                 int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
                 DatabaseEntry key = new DatabaseEntry();
                 IntegerBinding.intToEntry(sourceVersion, key);
@@ -73,6 +78,12 @@ public class Upgrader
             }
 
             int version = getSourceVersion(versionDb);
+
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Source message store version is " + version);
+            }
+
             if(version > BDBMessageStore.VERSION)
             {
                 throw new AMQStoreException("Database version " + version
@@ -178,7 +189,7 @@ public class Upgrader
 
     private int identifyOldStoreVersion() throws DatabaseException
     {
-        int version = 0;
+        int version = BDBMessageStore.VERSION;
         for (String databaseName : _environment.getDatabaseNames())
         {
             if (databaseName.contains("_v"))

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java (from r1559960, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java&r1=1559960&r2=1560094&rev=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/NoopReplicationGroupListener.java Tue Jan 21 16:57:19 2014
@@ -20,15 +20,23 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import java.util.Collections;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.replication.ReplicationGroupListener;
 
-public class StandardEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+class NoopReplicationGroupListener implements ReplicationGroupListener
 {
+    @Override
+    public void onReplicationNodeRecovered(ReplicationNode node)
+    {
+    }
 
     @Override
-    EnvironmentFacade createEnvironmentFacade()
+    public void onReplicationNodeAddedToGroup(ReplicationNode node)
     {
-        return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
     }
 
-}
+    @Override
+    public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+    {
+    }
+}
\ No newline at end of file

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Tue Jan 21 16:57:19 2014
@@ -28,23 +28,17 @@ import static org.apache.qpid.server.mod
 import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
 import static org.apache.qpid.server.model.ReplicationNode.NAME;
 import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -56,51 +50,31 @@ import org.apache.qpid.server.replicatio
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
 import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
 
-import com.sleepycat.bind.tuple.IntegerBinding;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.Environment;
-import com.sleepycat.je.Transaction;
 import com.sleepycat.je.rep.InsufficientReplicasException;
 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
 import com.sleepycat.je.rep.ReplicationConfig;
 import com.sleepycat.je.rep.StateChangeEvent;
 import com.sleepycat.je.rep.StateChangeListener;
 
-public class ReplicatedEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
 {
-
-    private static class NoopReplicationGroupListener implements ReplicationGroupListener
-    {
-        @Override
-        public void onReplicationNodeRecovered(ReplicationNode node)
-        {
-        }
-
-        @Override
-        public void onReplicationNodeAddedToGroup(ReplicationNode node)
-        {
-        }
-
-        @Override
-        public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
-        {
-        }
-    }
-
+    protected File _storePath;
     private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
-    private static final TimeUnit WAIT_STATE_CHANGE_TIME_UNIT = TimeUnit.SECONDS;
+    private static final int LISTENER_TIMEOUT = 5;
     private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
     private static final String TEST_GROUP_NAME = "testGroupName";
     private static final String TEST_NODE_NAME = "testNodeName";
     private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
     private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
     private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
-    private static final boolean TEST_DESIGNATED_PRIMARY = true;
+    private static final boolean TEST_DESIGNATED_PRIMARY = false;
     private static final boolean TEST_COALESCING_SYNC = true;
     private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
     private VirtualHost _virtualHost = mock(VirtualHost.class);
@@ -110,6 +84,8 @@ public class ReplicatedEnvironmentFacade
     {
         super.setUp();
 
+        _storePath = TestFileUtils.createTestDirectory("bdb", true);
+
         when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L);
         when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)).thenReturn(100L);
     }
@@ -126,58 +102,122 @@ public class ReplicatedEnvironmentFacade
         }
         finally
         {
-            super.tearDown();
+            try
+            {
+                if (_storePath != null)
+                {
+                    FileUtils.delete(_storePath, true);
+                }
+            }
+            finally
+            {
+                super.tearDown();
+            }
         }
     }
+    public void testEnvironmentFacade() throws Exception
+    {
+        EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+        assertNotNull("Environment should not be null", ef);
+        Environment e = ef.getEnvironment();
+        assertTrue("Environment is not valid", e.isValid());
+    }
+
+    public void testClose() throws Exception
+    {
+        EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+        ef.close();
+        Environment e = ef.getEnvironment();
+
+        assertNull("Environment should be null after facade close", e);
+    }
+
+    public void testOpenDatabases() throws Exception
+    {
+        EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(dbConfig, "test1", "test2");
+        Database test1 = ef.getOpenDatabase("test1");
+        Database test2 = ef.getOpenDatabase("test2");
+
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+    }
 
-    public void testGetName()
+    public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
     {
-        assertEquals("Unexpected name", getName(), getEnvironmentFacade().getName());
+        EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(dbConfig, "test1");
+        Database test1 = ef.getOpenDatabase("test1");
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        try
+        {
+            ef.getOpenDatabase("test2");
+            fail("An exception should be thrown for the non existing database");
+        }
+        catch(IllegalArgumentException e)
+        {
+            assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage());
+        }
     }
 
-    public void testGetGroupName()
+    
+    public void testGetName() throws Exception
     {
-        assertEquals("Unexpected group name", TEST_GROUP_NAME, getEnvironmentFacade().getGroupName());
+        assertEquals("Unexpected name", getName(), ((ReplicatedEnvironmentFacade) createMaster()).getName());
     }
 
-    public void testGetNodeName()
+    public void testGetGroupName() throws Exception
     {
-        assertEquals("Unexpected group name", TEST_NODE_NAME, getEnvironmentFacade().getNodeName());
+        assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName());
     }
 
-    public void testGetNodeHostPort()
+    public void testGetNodeName() throws Exception
     {
-        assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, getEnvironmentFacade().getHostPort());
+        assertEquals("Unexpected group name", TEST_NODE_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getNodeName());
     }
 
-    public void testGetHelperHostPort()
+    public void testGetNodeHostPort() throws Exception
     {
-        assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, getEnvironmentFacade().getHelperHostPort());
+        assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHostPort());
     }
 
-    public void testGetDurability()
+    public void testGetHelperHostPort() throws Exception
     {
-        assertEquals("Unexpected durability", TEST_DURABILITY.toString(), getEnvironmentFacade().getDurability());
+        assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHelperHostPort());
     }
 
-    public void testIsCoalescingSync()
+    public void testGetDurability() throws Exception
     {
-        assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, getEnvironmentFacade().isCoalescingSync());
+        assertEquals("Unexpected durability", TEST_DURABILITY.toString(), ((ReplicatedEnvironmentFacade) createMaster()).getDurability());
     }
 
-    public void testGetNodeState()
+    public void testIsCoalescingSync() throws Exception
     {
-        assertEquals("Unexpected state", State.MASTER.name(), getEnvironmentFacade().getNodeState());
+        assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, ((ReplicatedEnvironmentFacade) createMaster()).isCoalescingSync());
     }
 
-    public void testIsDesignatedPrimary()
+    public void testGetNodeState() throws Exception
     {
-        assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, getEnvironmentFacade().isDesignatedPrimary());
+        assertEquals("Unexpected state", State.MASTER.name(), ((ReplicatedEnvironmentFacade) createMaster()).getNodeState());
     }
 
-    public void testGetGroupMembers()
+    public void testIsDesignatedPrimary()  throws Exception
     {
-        List<Map<String, String>> groupMembers = getEnvironmentFacade().getGroupMembers();
+        ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
+        assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+        master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+        assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+    }
+
+    public void testGetGroupMembers()  throws Exception
+    {
+        List<Map<String, String>> groupMembers = ((ReplicatedEnvironmentFacade) createMaster()).getGroupMembers();
         Map<String, String> expectedMember = new HashMap<String, String>();
         expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
         expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
@@ -187,29 +227,36 @@ public class ReplicatedEnvironmentFacade
 
     public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
     {
-        getEnvironmentFacade();
+        ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
         String nodeName2 = TEST_NODE_NAME + "_2";
         String host = "localhost";
         int port = getNextAvailable(TEST_NODE_PORT + 1);
         String node2NodeHostPort = host + ":" + port;
-        ReplicatedEnvironmentFacade replicatedEnvironmentFacade2 = joinReplica(nodeName2, node2NodeHostPort);
 
-        List<Map<String, String>> groupMembers = replicatedEnvironmentFacade2.getGroupMembers();
+        final AtomicInteger invocationCount = new AtomicInteger();
+        final CountDownLatch nodeRecoveryLatch = new CountDownLatch(1);
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onReplicationNodeRecovered(ReplicationNode node)
+            {
+                nodeRecoveryLatch.countDown();
+                invocationCount.incrementAndGet();
+            }
+        };
+
+        addReplica(nodeName2, node2NodeHostPort, listener);
+
+        List<Map<String, String>> groupMembers = master.getGroupMembers();
         assertEquals("Unexpected number of nodes", 2, groupMembers.size());
 
-        ReplicationGroupListener listener = mock(ReplicationGroupListener.class);
-        replicatedEnvironmentFacade2.setReplicationGroupListener(listener);
-        verify(listener).onReplicationNodeRecovered(any(RemoteReplicationNode.class));
+        assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+        assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
     }
 
     public void testReplicationGroupListenerHearsNodeAdded() throws Exception
     {
-        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
-
-        List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
-        assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size());
-
-        final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
         final AtomicInteger invocationCount = new AtomicInteger();
         ReplicationGroupListener listener = new NoopReplicationGroupListener()
         {
@@ -217,16 +264,22 @@ public class ReplicatedEnvironmentFacade
             public void onReplicationNodeAddedToGroup(ReplicationNode node)
             {
                 invocationCount.getAndIncrement();
-                latch.countDown();
+                nodeAddedLatch.countDown();
             }
         };
-        replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+        assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
+        assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size());
 
         String node2Name = TEST_NODE_NAME + "_2";
         String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
-        joinReplica(node2Name, node2NodeHostPort);
+        addReplica(node2Name, node2NodeHostPort);
 
-        assertTrue("Listener not fired within timeout", latch.await(5, TimeUnit.SECONDS));
+        assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
 
         List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
         assertEquals("Unexpected number of nodes", 2, groupMembers.size());
@@ -236,14 +289,6 @@ public class ReplicatedEnvironmentFacade
 
     public void testReplicationGroupListenerHearsNodeRemoved() throws Exception
     {
-        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
-        String node2Name = TEST_NODE_NAME + "_2";
-        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
-        joinReplica(node2Name, node2NodeHostPort);
-
-        List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
-        assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
-
         final CountDownLatch nodeDeletedLatch = new CountDownLatch(1);
         final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
         final AtomicInteger invocationCount = new AtomicInteger();
@@ -269,15 +314,24 @@ public class ReplicatedEnvironmentFacade
             }
         };
 
-        replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+        assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
+        addReplica(node2Name, node2NodeHostPort);
+
+        List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
+        assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
 
         // Need to await the listener hearing the addition of the node to the model.
-        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(5, TimeUnit.SECONDS));
+        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
 
         // Now remove the node and ensure we hear the event
         replicatedEnvironmentFacade.removeNodeFromGroup(node2Name);
 
-        assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(5, TimeUnit.SECONDS));
+        assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
 
         List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
         assertEquals("Unexpected number of nodes after node removal", 1, groupMembers.size());
@@ -300,17 +354,18 @@ public class ReplicatedEnvironmentFacade
             }
         };
 
-        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getEnvironmentFacade();
-        replicatedEnvironmentFacade.setReplicationGroupListener(listener);
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
+        assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
 
         String node2Name = TEST_NODE_NAME + "_2";
         String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
-        joinReplica(node2Name, node2NodeHostPort);
+        addReplica(node2Name, node2NodeHostPort);
 
         List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
         assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
 
-        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(5, TimeUnit.SECONDS));
+        assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
 
         RemoteReplicationNode remoteNode = (RemoteReplicationNode)nodeRef.get();
         assertEquals("Unexpcted node name", node2Name, remoteNode.getName());
@@ -329,298 +384,198 @@ public class ReplicatedEnvironmentFacade
 
     public void testRemoveNodeFromGroup() throws Exception
     {
-        ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
-        String nodeName = TEST_NODE_NAME + "_2";
-        ReplicatedEnvironmentFacade ref2 = joinReplica(nodeName, "localhost:" + getNextAvailable(TEST_NODE_PORT + 1));
+        ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster();
+
+        String node2Name = TEST_NODE_NAME + "_2";
+        String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
+        ReplicatedEnvironmentFacade ref2 = addReplica(node2Name, node2NodeHostPort);
+
         List<Map<String, String>> groupMembers = environmentFacade.getGroupMembers();
         assertEquals("Unexpected group members count", 2, groupMembers.size());
         ref2.close();
 
-        environmentFacade.removeNodeFromGroup(nodeName);
+        environmentFacade.removeNodeFromGroup(node2Name);
         groupMembers = environmentFacade.getGroupMembers();
         assertEquals("Unexpected group members count", 1, groupMembers.size());
     }
 
-    public void testSetDesignatedPrimary() throws AMQStoreException
+    public void testSetDesignatedPrimary() throws Exception
     {
-        ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
+        ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster();
         environmentFacade.setDesignatedPrimary(false);
         assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary());
     }
 
-    public void testGetNodePriority()
+    public void testGetNodePriority() throws Exception
     {
-        assertEquals("Unexpected node priority", 1, getEnvironmentFacade().getPriority());
+        assertEquals("Unexpected node priority", 1, ((ReplicatedEnvironmentFacade) createMaster()).getPriority());
     }
 
-    public void testGetElectableGroupSizeOverride()
+    public void testGetElectableGroupSizeOverride() throws Exception
     {
-        assertEquals("Unexpected Electable Group Size Override", 0, getEnvironmentFacade().getElectableGroupSizeOverride());
+        assertEquals("Unexpected Electable Group Size Override", 0, ((ReplicatedEnvironmentFacade) createMaster()).getElectableGroupSizeOverride());
     }
 
     public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
     {
-        ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
-        ReplicatedEnvironmentFacade environmentFacade = nodes[0];
+        ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
+
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String replica1NodeName = TEST_NODE_NAME + "_1";
+        String replica1NodeHostPort = "localhost:" + replica1Port;
+        ReplicatedEnvironmentFacade replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
+
+        int replica2Port = getNextAvailable(replica1Port + 1);
+        String replica2NodeName = TEST_NODE_NAME + "_2";
+        String replica2NodeHostPort = "localhost:" + replica2Port;
+        ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
 
         String databaseName = "test";
-        DatabaseConfig dbConfig = createDatabase(environmentFacade, databaseName);
+
+        DatabaseConfig dbConfig = createDatabase(master, databaseName);
 
         // close replicas
-        nodes[1].close();
-        nodes[2].close();
+        replica1.close();
+        replica2.close();
 
-        final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
-        Environment e = environmentFacade.getEnvironment();
-        Database db = environmentFacade.getOpenDatabase(databaseName);
+        Environment e = master.getEnvironment();
+        Database db = master.getOpenDatabase(databaseName);
         try
         {
-            environmentFacade.openDatabases(new String[] { "test2" }, dbConfig);
+            master.openDatabases(dbConfig, "test2");
             fail("Opening of new database without quorum should fail");
         }
         catch(InsufficientReplicasException ex)
         {
-            environmentFacade.handleDatabaseException(null, ex);
+            master.handleDatabaseException(null, ex);
         }
 
-        // restore quorum
-        nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getHostPort());
-        nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getHostPort());
+        replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
+        replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
 
-        environmentFacade.setStateChangeListener(new StateChangeListener()
+        // Need to poll to await the remote node updating itself
+        long timeout = System.currentTimeMillis() + 5000;
+        while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
         {
-            @Override
-            public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
-            {
-                if (stateChangeEvent.getState() == State.MASTER || stateChangeEvent.getState() == State.REPLICA)
-                {
-                    nodeAwaitLatch.countDown();
-                }
-            }
-        });
+            Thread.sleep(200);
+        }
 
-        assertTrue("The node could not rejoin the cluster",
-                nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
+        assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), 
+                State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
 
-        Environment e2 = environmentFacade.getEnvironment();
+        Environment e2 = master.getEnvironment();
         assertNotSame("Environment has not been restarted", e2, e);
 
-        Database db1 = environmentFacade.getOpenDatabase(databaseName);
+        Database db1 = master.getOpenDatabase(databaseName);
         assertNotSame("Database should be the re-created", db1, db);
     }
 
-    public void testEnvironmentIsRestartOnlyOnceOnInsufficientReplicas() throws Exception
+    public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
     {
-        ReplicatedEnvironmentFacade[] nodes = startClusterSequentially(3);
-        final ReplicatedEnvironmentFacade environmentFacade = nodes[0];
-
-        int numberOfThreads = 100;
-
-        // restart counter
-        final AtomicInteger numberOfTimesElected = new AtomicInteger();
-        environmentFacade.setStateChangeListener(new StateChangeListener()
+        final CountDownLatch masterLatch = new CountDownLatch(1);
+        final AtomicInteger masterStateChangeCount = new AtomicInteger();
+        final CountDownLatch unknownLatch = new CountDownLatch(1);
+        final AtomicInteger unknownStateChangeCount = new AtomicInteger();
+        StateChangeListener stateChangeListener = new StateChangeListener()
         {
             @Override
             public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
             {
                 if (stateChangeEvent.getState() == State.MASTER)
                 {
-                    numberOfTimesElected.incrementAndGet();
+                    masterStateChangeCount.incrementAndGet();
+                    masterLatch.countDown();
+                }
+                else if (stateChangeEvent.getState() == State.UNKNOWN)
+                {
+                    unknownStateChangeCount.incrementAndGet();
+                    unknownLatch.countDown();
                 }
             }
-        });
+        };
 
-        String databaseName = "test";
-        createDatabase(environmentFacade, databaseName);
-        final CountDownLatch latch = new CountDownLatch(numberOfThreads);
+        addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+        assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
 
-        final Database db = environmentFacade.getOpenDatabase(databaseName);
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node1NodeHostPort = "localhost:" + replica1Port;
+        int replica2Port = getNextAvailable(replica1Port + 1);
+        String node2NodeHostPort = "localhost:" + replica2Port;
 
-        // close replicas
-        nodes[1].close();
-        nodes[2].close();
+        ReplicatedEnvironmentFacade replica1 = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+        ReplicatedEnvironmentFacade replica2 = addReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
 
-        // perform transactions in separate threads in order to provoke InsufficientReplicasException
-        ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
-        try
-        {
-            List<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
-            for (int i = 0; i < numberOfThreads; i++)
-            {
-                final int index = i;
-                tasks.add(new Callable<Void>(){
-
-                    @Override
-                    public Void call() throws Exception
-                    {
-                        try
-                        {
-                            Transaction tx = environmentFacade.getEnvironment().beginTransaction(null, null);
-                            DatabaseEntry key = new DatabaseEntry();
-                            DatabaseEntry data = new DatabaseEntry();
-                            IntegerBinding.intToEntry(index, key);
-                            IntegerBinding.intToEntry(index, data);
-                            db.put(tx, key, data);
-                            tx.commit();
-                        }
-                        catch(DatabaseException e)
-                        {
-                            _environmentFacade.handleDatabaseException("Exception", e);
-                        }
-                        finally
-                        {
-                            latch.countDown();
-                        }
-                        return null;
-                    }});
-            }
-            service.invokeAll(tasks);
-            assertTrue("Not all tasks have been executed",
-                    latch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
-        }
-        finally
-        {
-            service.shutdown();
-        }
+        // close replicas
+        replica1.close();
+        replica2.close();
 
-        // restore quorum
-        nodes[1] = joinReplica(TEST_NODE_NAME + "_1", nodes[1].getHostPort());
-        nodes[2] = joinReplica(TEST_NODE_NAME + "_2", nodes[2].getHostPort());
+        assertTrue("Environment should be recreated and go into unknown state",
+                unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
 
-        long start = System.currentTimeMillis();
-        while(environmentFacade.getFacadeState() != ReplicatedEnvironmentFacade.State.OPEN && System.currentTimeMillis() - start < 10000l)
-        {
-            Thread.sleep(1000l);
-        }
-        assertEquals("EnvironmentFacade should be in open state", ReplicatedEnvironmentFacade.State.OPEN, environmentFacade.getFacadeState());
-
-        // it should be elected twice: once on first start-up and second time after environment restart
-        assertEquals("Elected master unexpected number of times", 2, numberOfTimesElected.get());
+        assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get());
+        assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
+        
+        // restart other nodes
+        // check state of node 1 is either MASTER or REPLICA
     }
 
-    public void testFacadeStateTransitions() throws InterruptedException
+    public void testCloseStateTransitions() throws Exception
     {
-        String nodeName = "node1";
-        final String nodePath = createNodeWorkingFolder(nodeName);
-        ReplicatedEnvironmentFacade ref = null;
-        try
-        {
-            ref = createReplicatedEnvironmentFacade(nodePath, nodeName, TEST_NODE_HOST_PORT, false);
-            assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPENING, ref.getFacadeState());
-
-            final CountDownLatch nodeAwaitLatch = new CountDownLatch(1);
-            ref.setStateChangeListener(new StateChangeListener()
-            {
-                @Override
-                public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
-                {
-                    if (stateChangeEvent.getState() == State.MASTER)
-                    {
-                        nodeAwaitLatch.countDown();
-                    }
-                }
-            });
-            assertTrue("Node did not join the cluster", nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
-            assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, ref.getFacadeState());
-            ref.close();
-            assertEquals("Unexpected state " + ref.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, ref.getFacadeState());
-        }
-        finally
-        {
-            if (ref != null)
-            {
-                ref.close();
-            }
-        }
-    }
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
 
-    @Override
-    EnvironmentFacade createEnvironmentFacade()
-    {
-        try
-        {
-            return startNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, true, State.MASTER);
-        }
-        catch (InterruptedException e)
-        {
-            Thread.interrupted();
-            throw new RuntimeException(e);
-        }
+        assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+        replicatedEnvironmentFacade.close();
+        assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
     }
 
-    @Override
-    ReplicatedEnvironmentFacade getEnvironmentFacade()
+    private ReplicatedEnvironmentFacade createMaster() throws Exception
     {
-        return (ReplicatedEnvironmentFacade) super.getEnvironmentFacade();
+        TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+        ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+        assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+        return env;
     }
 
-    private ReplicatedEnvironmentFacade joinReplica(final String nodeName, final String hostPort) throws InterruptedException
+    private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort) throws Exception
     {
-        return startNode(nodeName, hostPort, false, State.REPLICA);
+        return addReplica(nodeName, nodeHostPort, new NoopReplicationGroupListener());
     }
 
-    private ReplicatedEnvironmentFacade startNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State targetState)
-            throws InterruptedException
+    private ReplicatedEnvironmentFacade addReplica(String nodeName, String nodeHostPort, ReplicationGroupListener replicationGroupListener)
+            throws Exception
     {
-        final String nodePath = createNodeWorkingFolder(nodeName);
-        final CountDownLatch _nodeAwaitLatch = new CountDownLatch(1);
-        ReplicatedEnvironmentFacade ref = join(nodeName, nodePath, nodeHostPort, designatedPrimary, _nodeAwaitLatch, targetState);
-        assertTrue("Node did not join the cluster", _nodeAwaitLatch.await(WAIT_STATE_CHANGE_TIMEOUT, WAIT_STATE_CHANGE_TIME_UNIT));
-        return ref;
+        TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
+        ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, replicationGroupListener);
+        assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+        return replicaEnvironmentFacade;
     }
 
     private String createNodeWorkingFolder(String nodeName)
     {
         File nodeLocation = new File(_storePath, nodeName);
-        nodeLocation.mkdirs();
-        final String nodePath = nodeLocation.getAbsolutePath();
-        return nodePath;
-    }
-
-    private ReplicatedEnvironmentFacade join(String nodeName, String nodePath, String nodeHostPort, boolean designatedPrimary,
-            final CountDownLatch nodeAwaitLatch, final State expectedState)
-    {
-        ReplicatedEnvironmentFacade ref = createReplicatedEnvironmentFacade(nodePath, nodeName, nodeHostPort, designatedPrimary);
-
-        if (expectedState == State.REPLICA)
+        if (!nodeLocation.exists())
         {
-            _nodes.put(nodeName, ref);
+            nodeLocation.mkdirs();
         }
-        ref.setStateChangeListener(new StateChangeListener()
-        {
-            @Override
-            public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
-            {
-                if (stateChangeEvent.getState() == expectedState)
-                {
-                    nodeAwaitLatch.countDown();
-                }
-            }
-        });
-        return ref;
+        final String nodePath = nodeLocation.getAbsolutePath();
+        return nodePath;
     }
 
-    private ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(String nodePath, String nodeName, String nodeHostPort,
-            boolean designatedPrimary)
+    private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
+            State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
     {
+        final String nodePath = createNodeWorkingFolder(nodeName);
         ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
-        return new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
+        ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
+        ref.setReplicationGroupListener(replicationGroupListener);
+        ref.setStateChangeListener(stateChangeListener);
+        _nodes.put(nodeName, ref);
+        return ref;
     }
 
-    private ReplicatedEnvironmentFacade[] startClusterSequentially(int nodeNumber) throws InterruptedException
+    private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener groupChangeListener)
     {
-        // master
-        ReplicatedEnvironmentFacade environmentFacade = getEnvironmentFacade();
-        ReplicatedEnvironmentFacade[] nodes = new ReplicatedEnvironmentFacade[nodeNumber];
-        nodes[0] = environmentFacade;
-
-        int nodePort = TEST_NODE_PORT;
-        for (int i = 1; i < nodeNumber; i++)
-        {
-            nodePort = getNextAvailable(nodePort + 1);
-            nodes[i] = joinReplica(TEST_NODE_NAME + "_" + i, "localhost:" + nodePort);
-        }
-        return nodes;
+        return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, groupChangeListener);
     }
 
     private DatabaseConfig createDatabase(ReplicatedEnvironmentFacade environmentFacade, String databaseName) throws AMQStoreException
@@ -628,7 +583,7 @@ public class ReplicatedEnvironmentFacade
         DatabaseConfig dbConfig = new DatabaseConfig();
         dbConfig.setTransactional(true);
         dbConfig.setAllowCreate(true);
-        environmentFacade.openDatabases(new String[] { databaseName }, dbConfig);
+        environmentFacade.openDatabases(dbConfig,  databaseName);
         return dbConfig;
     }
 
@@ -650,5 +605,4 @@ public class ReplicatedEnvironmentFacade
         when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
         return node;
     }
-
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?rev=1560094&r1=1560093&r2=1560094&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Tue Jan 21 16:57:19 2014
@@ -20,12 +20,107 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
+import java.io.File;
 import java.util.Collections;
 
-public class StandardEnvironmentFacadeTest extends EnvironmentFacadeTestCase
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public class StandardEnvironmentFacadeTest extends QpidTestCase
 {
+    protected File _storePath;
+    protected EnvironmentFacade _environmentFacade;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _storePath = TestFileUtils.createTestDirectory("bdb", true);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+            if (_environmentFacade != null)
+            {
+                _environmentFacade.close();
+            }
+        }
+        finally
+        {
+            if (_storePath != null)
+            {
+                FileUtils.delete(_storePath, true);
+            }
+        }
+    }
+
+    public void testEnvironmentFacade() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        assertNotNull("Environment should not be null", ef);
+        Environment e = ef.getEnvironment();
+        assertTrue("Environment is not valid", e.isValid());
+    }
+
+    public void testClose() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        ef.close();
+        Environment e = ef.getEnvironment();
+
+        assertNull("Environment should be null after facade close", e);
+    }
+
+    public void testOpenDatabases() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(dbConfig, "test1", "test2");
+        Database test1 = ef.getOpenDatabase("test1");
+        Database test2 = ef.getOpenDatabase("test2");
+
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+    }
+
+    public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(dbConfig, "test1");
+        Database test1 = ef.getOpenDatabase("test1");
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        try
+        {
+            ef.getOpenDatabase("test2");
+            fail("An exception should be thrown for the non existing database");
+        }
+        catch(IllegalArgumentException e)
+        {
+            assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+        }
+    }
+
+    EnvironmentFacade getEnvironmentFacade() throws Exception
+    {
+        if (_environmentFacade == null)
+        {
+            _environmentFacade = createEnvironmentFacade();
+        }
+        return _environmentFacade;
+    }
 
-    @Override
     EnvironmentFacade createEnvironmentFacade()
     {
         return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message