qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1576697 [3/4] - in /qpid/branches/java-broker-bdb-ha2/qpid/java: bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/src/main/java/org/apa...
Date Wed, 12 Mar 2014 11:28:50 GMT
Added: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1576697&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (added)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Wed Mar 12 11:28:49 2014
@@ -0,0 +1,228 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+
+public class StandardEnvironmentFacade implements EnvironmentFacade
+{
+    private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class);
+    public static final String TYPE = "BDB";
+
+    private final String _storePath;
+    private final Map<String, Database> _databases = new HashMap<String, Database>();
+
+    private Environment _environment;
+
+    public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
+    {
+        _storePath = storePath;
+
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Creating environment at environment path " + _storePath);
+        }
+
+        File environmentPath = new File(storePath);
+        if (!environmentPath.exists())
+        {
+            if (!environmentPath.mkdirs())
+            {
+                throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+                                                   + "Ensure the path is correct and that the permissions are correct.");
+            }
+        }
+
+        EnvironmentConfig envConfig = new EnvironmentConfig();
+        envConfig.setAllowCreate(true);
+        envConfig.setTransactional(true);
+
+        for (Map.Entry<String, String> configItem : attributes.entrySet())
+        {
+            LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+            envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+        }
+
+        envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
+        _environment = new Environment(environmentPath, envConfig);
+    }
+
+    @Override
+    public void commit(com.sleepycat.je.Transaction tx)
+    {
+        try
+        {
+            tx.commitNoSync();
+        }
+        catch (DatabaseException de)
+        {
+            LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+            closeEnvironmentSafely();
+
+            throw handleDatabaseException("Got DatabaseException on commit", de);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        closeDatabases();
+        closeEnvironment();
+    }
+
+    private void closeDatabases()
+    {
+        RuntimeException firstThrownException = null;
+        for (Database database : _databases.values())
+        {
+            try
+            {
+                database.close();
+            }
+            catch(RuntimeException e)
+            {
+                if (firstThrownException == null)
+                {
+                    firstThrownException = e;
+                }
+            }
+        }
+        if (firstThrownException != null)
+        {
+            throw firstThrownException;
+        }
+    }
+
+    private void closeEnvironmentSafely()
+    {
+        if (_environment != null)
+        {
+            if (_environment.isValid())
+            {
+                try
+                {
+                    closeDatabases();
+                }
+                catch(Exception e)
+                {
+                    LOGGER.error("Exception closing environment databases", e);
+                }
+            }
+            try
+            {
+                _environment.close();
+            }
+            catch (DatabaseException ex)
+            {
+                LOGGER.error("Exception closing store environment", ex);
+            }
+            catch (IllegalStateException ex)
+            {
+                LOGGER.error("Exception closing store environment", ex);
+            }
+            finally
+            {
+                _environment = null;
+            }
+        }
+    }
+
+    @Override
+    public Environment getEnvironment()
+    {
+        return _environment;
+    }
+
+    private void closeEnvironment()
+    {
+        if (_environment != null)
+        {
+            // Clean the log before closing. This makes sure it doesn't contain
+            // redundant data. Closing without doing this means the cleaner may
+            // not get a chance to finish.
+            try
+            {
+                _environment.cleanLog();
+            }
+            finally
+            {
+                _environment.close();
+                _environment = null;
+            }
+        }
+    }
+
+    @Override
+    public DatabaseException handleDatabaseException(String contextMessage, DatabaseException e)
+    {
+        if (_environment != null && !_environment.isValid())
+        {
+            closeEnvironmentSafely();
+        }
+        return e;
+    }
+
+    @Override
+    public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+    {
+        for (String databaseName : databaseNames)
+        {
+            Database database = _environment.openDatabase(null, databaseName, dbConfig);
+            _databases .put(databaseName, database);
+        }
+    }
+
+    @Override
+    public Database getOpenDatabase(String name)
+    {
+        Database database = _databases.get(name);
+        if (database == null)
+        {
+            throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+        }
+        return database;
+    }
+
+    @Override
+    public Committer createCommitter(String name)
+    {
+        return new CoalescingCommiter(name, this);
+    }
+
+    @Override
+    public String getStoreLocation()
+    {
+        return _storePath;
+    }
+
+}

Added: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java?rev=1576697&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java (added)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java Wed Mar 12 11:28:49 2014
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.VirtualHost;
+
+public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
+    {
+        Map<String, String> envConfigMap = new HashMap<String, String>();
+        envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+
+        Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+        if (environmentConfigurationAttributes instanceof Map)
+        {
+            envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
+        }
+
+        String name = virtualHost.getName();
+        final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
+
+        String storeLocation;
+        if(isMessageStore)
+        {
+            storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+            if(storeLocation == null)
+            {
+                storeLocation = defaultPath;
+            }
+        }
+        else // we are acting only as the durable config store
+        {
+            storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+            if(storeLocation == null)
+            {
+                storeLocation = defaultPath;
+            }
+        }
+
+        return new StandardEnvironmentFacade(storeLocation, envConfigMap);
+    }
+
+    @Override
+    public String getType()
+    {
+        return StandardEnvironmentFacade.TYPE;
+    }
+
+}

Added: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java?rev=1576697&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java (added)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java Wed Mar 12 11:28:49 2014
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+public class DatabasePinger
+{
+    public static final String PING_DATABASE_NAME = "PINGDB";
+    private static final int ID = 0;
+
+    public void pingDb(EnvironmentFacade facade)
+    {
+        try
+        {
+            final Database db = facade.getOpenDatabase(PING_DATABASE_NAME);
+
+            DatabaseEntry key = new DatabaseEntry();
+            IntegerBinding.intToEntry(ID, key);
+
+            DatabaseEntry value = new DatabaseEntry();
+            LongBinding.longToEntry(System.currentTimeMillis(), value);
+            Transaction txn = null;
+            try
+            {
+                txn = facade.getEnvironment().beginTransaction(null, null);
+                db.put(txn, key, value);
+                txn.commit();
+                txn = null;
+            }
+            finally
+            {
+                try
+                {
+                    if (txn != null)
+                    {
+                        txn.abort();
+                    }
+                }
+                finally
+                {
+                    db.close();
+                }
+            }
+        }
+        catch (DatabaseException de)
+        {
+            facade.handleDatabaseException("DatabaseException from DatabasePinger ", de);
+        }
+    }
+}

Copied: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java (from r1576683, qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java?p2=qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java&p1=qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java&r1=1576683&r2=1576697&rev=1576697&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java Wed Mar 12 11:28:49 2014
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,13 +18,23 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.store;
 
-public interface HAMessageStore extends MessageStore
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Map;
+
+public interface ReplicatedEnvironmentConfiguration
 {
-    /**
-     * Used to indicate that a store requires to make itself unavailable for read and read/write
-     * operations.
-     */
-    void passivate();
+    String getName();
+    String getGroupName();
+    String getHostPort();
+    String getHelperHostPort();
+    String getDurability();
+    boolean isCoalescingSync();
+    boolean isDesignatedPrimary();
+    int getPriority();
+    int getQuorumOverride();
+    String getStorePath();
+    Map<String, String> getParameters();
+    Map<String, String> getReplicationParameters();
 }

Added: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1576697&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (added)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Wed Mar 12 11:28:49 2014
@@ -0,0 +1,1052 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
+import org.apache.qpid.server.store.berkeleydb.Committer;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
+import org.apache.qpid.server.util.DaemonThreadFactory;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.RepInternal;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.RestartRequiredException;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
+import com.sleepycat.je.rep.vlsn.VLSNRange;
+import com.sleepycat.je.utilint.PropUtil;
+import com.sleepycat.je.utilint.VLSN;
+
+public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
+{
+    public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
+    public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval";
+
+    private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
+
+    private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
+    private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
+
+    private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
+    private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
+
+    @SuppressWarnings("serial")
+    private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+    {{
+        /**
+         * Parameter decreased as the 24h default may lead very large log files for most users.
+         */
+        put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+        /**
+         * Parameter increased as the 5 s default may lead to spurious timeouts.
+         */
+        put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+        /**
+         * Parameter increased as the 10 s default may lead to spurious timeouts.
+         */
+        put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+        /**
+         * Parameter decreased as the 10 h default may cause user confusion.
+         */
+        put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+        /**
+         * Parameter changed from default (off) to allow the Environment to start in the 
+         * UNKNOWN state when the majority is not available.
+         */
+        put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
+        /**
+         * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
+         * is scheduled to become default after JE 5.1.
+         */
+        put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
+        /**
+         * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
+         * with NO_SYN durability in case if such Node crushes.
+         */
+        put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+    }});
+
+    public static final String TYPE = "BDB-HA";
+
+    // TODO: JMX will change to observe the model, at that point these names will disappear
+    public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+    public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+    private final ReplicatedEnvironmentConfiguration _configuration;
+    private final Durability _durability;
+    private final Boolean _coalescingSync;
+    private final String _prettyGroupNodeName;
+    private final File _environmentDirectory;
+
+    private final ExecutorService _environmentJobExecutor;
+    private final ScheduledExecutorService _groupChangeExecutor;
+    private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
+    private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
+    private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
+
+    private volatile ReplicatedEnvironment _environment;
+    private volatile long _joinTime;
+    private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+
+    public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
+    {
+        _environmentDirectory = new File(configuration.getStorePath());
+        if (!_environmentDirectory.exists())
+        {
+            if (!_environmentDirectory.mkdirs())
+            {
+                throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. "
+                                                   + "Ensure the path is correct and that the permissions are correct.");
+            }
+        }
+
+        _configuration = configuration;
+
+        _durability = Durability.parse(_configuration.getDurability());
+        _coalescingSync = _configuration.isCoalescingSync();
+        _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
+
+        // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
+        _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
+        _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+        _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), 100, TimeUnit.MILLISECONDS);  // TODO make configurable
+
+        // create environment in a separate thread to avoid renaming of the current thread by JE
+        _environment = createEnvironment(true);
+    }
+
+    @Override
+    public void commit(final Transaction tx)
+    {
+        try
+        {
+            // Using commit() instead of commitNoSync() for the HA store to allow
+            // the HA durability configuration to influence resulting behaviour.
+            tx.commit();
+        }
+        catch (DatabaseException de)
+        {
+            throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        if (_state.compareAndSet(State.OPENING, State.CLOSING) ||
+                _state.compareAndSet(State.OPEN, State.CLOSING) ||
+                _state.compareAndSet(State.RESTARTING, State.CLOSING) )
+        {
+            try
+            {
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
+                }
+
+                _environmentJobExecutor.shutdown();
+                _groupChangeExecutor.shutdown();
+                closeDatabases();
+                closeEnvironment();
+            }
+            finally
+            {
+                _state.compareAndSet(State.CLOSING, State.CLOSED);
+            }
+        }
+    }
+
+    @Override
+    public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe)
+    {
+        boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException);
+        if (restart)
+        {
+            tryToRestartEnvironment(dbe);
+        }
+        return dbe;
+    }
+
+    private void tryToRestartEnvironment(final DatabaseException dbe)
+    {
+        if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+        {
+            if (dbe != null && LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
+            }
+
+            _environmentJobExecutor.execute(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        restartEnvironment();
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.error("Exception on environment restart", e);
+                    }
+                }
+            });
+
+        }
+        else
+        {
+            LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+        }
+    }
+
+    @Override
+    public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+    {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not in opened state");
+        }
+
+        if (!_environment.isValid())
+        {
+            throw new IllegalStateException("Environment is not valid");
+        }
+
+        if (_environment.getState() != ReplicatedEnvironment.State.MASTER)
+        {
+            throw new IllegalStateException("Databases can only be opened on Master node");
+        }
+
+        for (String databaseName : databaseNames)
+        {
+            _databases.put(databaseName, new DatabaseHolder(dbConfig));
+        }
+        for (String databaseName : databaseNames)
+        {
+            DatabaseHolder holder = _databases.get(databaseName);
+            openDatabaseInternally(databaseName, holder);
+        }
+    }
+
+    private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
+    {
+        Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
+        holder.setDatabase(database);
+    }
+
+    @Override
+    public Database getOpenDatabase(String name)
+    {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not in opened state");
+        }
+
+        if (!_environment.isValid())
+        {
+            throw new IllegalStateException("Environment is not valid");
+        }
+        DatabaseHolder databaseHolder = _databases.get(name);
+        if (databaseHolder == null)
+        {
+            throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened");
+        }
+        Database database = databaseHolder.getDatabase();
+        if (database == null)
+        {
+            throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+        }
+        return database;
+    }
+
+    @Override
+    public String getStoreLocation()
+    {
+        return _environmentDirectory.getAbsolutePath();
+    }
+
+    @Override
+    public void stateChange(final StateChangeEvent stateChangeEvent)
+    {
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + stateChangeEvent.getState());
+        }
+
+        if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+        {
+            _groupChangeExecutor.submit(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    stateChanged(stateChangeEvent);
+                }
+            });
+        }
+        else
+        {
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Ignoring the state environment change event as the environment facade for node '" + _prettyGroupNodeName
+                        + "' is in state " + _state.get());
+            }
+        }
+    }
+
+    private void stateChanged(StateChangeEvent stateChangeEvent)
+    {
+        ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+        if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
+        {
+            if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
+            {
+                LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
+                _joinTime = System.currentTimeMillis();
+            }
+        }
+
+        if (state == ReplicatedEnvironment.State.MASTER)
+        {
+            reopenDatabases();
+        }
+
+        StateChangeListener listener = _stateChangeListener.get();
+        if (listener != null)
+        {
+            listener.stateChange(stateChangeEvent);
+        }
+
+        if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN)
+        {
+            tryToRestartEnvironment(null);
+        }
+        _lastKnownEnvironmentState = state;
+    }
+
+    private void reopenDatabases()
+    {
+        DatabaseConfig pingDbConfig = new DatabaseConfig();
+        pingDbConfig.setTransactional(true);
+        pingDbConfig.setAllowCreate(true);
+
+        _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
+
+        for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+        {
+            openDatabaseInternally(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public String getGroupName()
+    {
+        return (String)_configuration.getGroupName();
+    }
+
+    public String getNodeName()
+    {
+        return _configuration.getName();
+    }
+
+    public String getHostPort()
+    {
+        return (String)_configuration.getHostPort();
+    }
+
+    public String getHelperHostPort()
+    {
+        return (String)_configuration.getHelperHostPort();
+    }
+
+    public String getDurability()
+    {
+        return _durability.toString();
+    }
+
+    public boolean isCoalescingSync()
+    {
+        return _coalescingSync;
+    }
+
+    public String getNodeState()
+    {
+        if (_state.get() != State.OPEN)
+        {
+            return ReplicatedEnvironment.State.UNKNOWN.name();
+        }
+        ReplicatedEnvironment.State state = _environment.getState();
+        return state.toString();
+    }
+
+    public boolean isDesignatedPrimary()
+    {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not opened");
+        }
+        return _environment.getRepMutableConfig().getDesignatedPrimary();
+    }
+
+    public Future<Void> setDesignatedPrimary(final boolean isPrimary)
+    {
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary);
+        }
+
+        return _environmentJobExecutor.submit(new Callable<Void>()
+        {
+            @Override
+            public Void call()
+            {
+                setDesignatedPrimaryInternal(isPrimary);
+                return null;
+            }
+        });
+    }
+
+    void setDesignatedPrimaryInternal(final boolean isPrimary)
+    {
+        try
+        {
+            final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+            final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+            _environment.setRepMutableConfig(newConfig);
+
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary);
+            }
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
+        }
+    }
+
+    int getPriority()
+    {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not opened");
+        }
+        ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+        return repConfig.getNodePriority();
+    }
+
+    public Future<Void> setPriority(final int priority)
+    {
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority);
+        }
+
+        return _environmentJobExecutor.submit(new Callable<Void>()
+        {
+            @Override
+            public Void call()
+            {
+                setPriorityInternal(priority);
+                return null;
+            }
+        });
+    }
+
+    void setPriorityInternal(int priority)
+    {
+        try
+        {
+            final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+            final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority);
+            _environment.setRepMutableConfig(newConfig);
+
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority);
+            }
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
+        }
+    }
+
+    int getElectableGroupSizeOverride()
+    {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not opened");
+        }
+        ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+        return repConfig.getElectableGroupSizeOverride();
+    }
+
+    public Future<Void> setElectableGroupSizeOverride(final int electableGroupOverride)
+    {
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride);
+        }
+
+        return _environmentJobExecutor.submit(new Callable<Void>()
+        {
+            @Override
+            public Void call()
+            {
+                setElectableGroupSizeOverrideInternal(electableGroupOverride);
+                return null;
+            }
+        });
+    }
+
+    void setElectableGroupSizeOverrideInternal(int electableGroupOverride)
+    {
+        try
+        {
+            final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+            final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride);
+            _environment.setRepMutableConfig(newConfig);
+
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride);
+            }
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
+        }
+    }
+
+
+    public long getJoinTime()
+    {
+        return _joinTime ;
+    }
+
+    public long getLastKnownReplicationTransactionId()
+    {
+        if (_state.get() == State.OPEN)
+        {
+            VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange();
+            VLSN lastTxnEnd = range.getLastTxnEnd();
+            return lastTxnEnd.getSequence();
+        }
+        else
+        {
+            return -1L;
+        }
+    }
+
+    public List<Map<String, String>> getGroupMembers()
+    {
+        List<Map<String, String>> members = new ArrayList<Map<String, String>>();
+
+        for (ReplicationNode node : _environment.getGroup().getNodes())
+        {
+            Map<String, String> nodeMap = new HashMap<String, String>();
+            nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName());
+            nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+            members.add(nodeMap);
+        }
+
+        return members;
+    }
+
+    public void removeNodeFromGroup(final String nodeName)
+    {
+        createReplicationGroupAdmin().removeMember(nodeName);
+    }
+
+    public void updateAddress(final String nodeName, final String newHostName, final int newPort)
+    {
+        createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+    }
+
+    private ReplicationGroupAdmin createReplicationGroupAdmin()
+    {
+        final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+        helpers.addAll(_environment.getRepConfig().getHelperSockets());
+
+        final ReplicationConfig repConfig = _environment.getRepConfig();
+        helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+        return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
+    }
+
+
+    public ReplicatedEnvironment getEnvironment()
+    {
+        return _environment;
+    }
+
+    public State getFacadeState()
+    {
+        return _state.get();
+    }
+
+    public void setStateChangeListener(StateChangeListener stateChangeListener)
+    {
+        if (_stateChangeListener.compareAndSet(null, stateChangeListener))
+        {
+            _environment.setStateChangeListener(this);
+        }
+        else
+        {
+            throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName);
+        }
+    }
+
+    private void closeEnvironment()
+    {
+        // Clean the log before closing. This makes sure it doesn't contain
+        // redundant data. Closing without doing this means the cleaner may not
+        // get a chance to finish.
+        try
+        {
+            if (_environment.isValid())
+            {
+                _environment.cleanLog();
+            }
+        }
+        finally
+        {
+            _environment.close();
+            _environment = null;
+        }
+    }
+
+    private void restartEnvironment()
+    {
+        LOGGER.info("Restarting environment");
+
+        closeEnvironmentSafely();
+
+        _environment = createEnvironment(false);
+
+        if (_stateChangeListener.get() != null)
+        {
+            _environment.setStateChangeListener(this);
+        }
+
+        LOGGER.info("Environment is restarted");
+    }
+
+    private void closeEnvironmentSafely()
+    {
+        Environment environment = _environment;
+        if (environment != null)
+        {
+            try
+            {
+                if (environment.isValid())
+                {
+                    try
+                    {
+                        closeDatabases();
+                    }
+                    catch(Exception e)
+                    {
+                        LOGGER.warn("Ignoring an exception whilst closing databases", e);
+                    }
+                }
+                environment.close();
+            }
+            catch (EnvironmentFailureException efe)
+            {
+                LOGGER.warn("Ignoring an exception whilst closing environment", efe);
+            }
+        }
+    }
+
+    private void closeDatabases()
+    {
+        RuntimeException firstThrownException = null;
+        for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+        {
+            DatabaseHolder databaseHolder = entry.getValue();
+            Database database = databaseHolder.getDatabase();
+            if (database != null)
+            {
+                try
+                {
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
+                    }
+
+                    database.close();
+                }
+                catch(RuntimeException e)
+                {
+                    LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e);
+                    if (firstThrownException == null)
+                    {
+                        firstThrownException = e;
+                    }
+                }
+                finally
+                {
+                    databaseHolder.setDatabase(null);
+                }
+            }
+        }
+        if (firstThrownException != null)
+        {
+            throw firstThrownException;
+        }
+    }
+
+    private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread)
+    {
+        String groupName = _configuration.getGroupName();
+        String helperHostPort = _configuration.getHelperHostPort();
+        String hostPort = _configuration.getHostPort();
+        Map<String, String> environmentParameters = _configuration.getParameters();
+        Map<String, String> replicationEnvironmentParameters = _configuration.getReplicationParameters();
+        boolean designatedPrimary = _configuration.isDesignatedPrimary();
+        int priority = _configuration.getPriority();
+        int quorumOverride = _configuration.getQuorumOverride();
+
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Creating environment");
+            LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath());
+            LOGGER.info("Group name " + groupName);
+            LOGGER.info("Node name " + _configuration.getName());
+            LOGGER.info("Node host port " + hostPort);
+            LOGGER.info("Helper host port " + helperHostPort);
+            LOGGER.info("Durability " + _durability);
+            LOGGER.info("Coalescing sync " + _coalescingSync);
+            LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+            LOGGER.info("Node priority " + priority);
+            LOGGER.info("Quorum override " + quorumOverride);
+        }
+
+        Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
+        if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
+        {
+            replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
+        }
+        Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+        if (environmentParameters != null && !environmentParameters.isEmpty())
+        {
+            environmentSettings.putAll(environmentParameters);
+        }
+
+        ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort);
+        replicationConfig.setHelperHosts(helperHostPort);
+        replicationConfig.setDesignatedPrimary(designatedPrimary);
+        replicationConfig.setNodePriority(priority);
+        replicationConfig.setElectableGroupSizeOverride(quorumOverride);
+
+        for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
+        {
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+            }
+            replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+        }
+
+        EnvironmentConfig envConfig = new EnvironmentConfig();
+        envConfig.setAllowCreate(true);
+        envConfig.setTransactional(true);
+        envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+        envConfig.setDurability(_durability);
+
+        for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
+        {
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+            }
+            envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+        }
+
+        if (createEnvironmentInSeparateThread)
+        {
+            return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig);
+        }
+        else
+        {
+            return createEnvironment(_environmentDirectory, envConfig, replicationConfig);
+        }
+    }
+
+    private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig,
+            final ReplicationConfig replicationConfig)
+    {
+        Future<ReplicatedEnvironment> environmentFuture = _environmentJobExecutor.submit(new Callable<ReplicatedEnvironment>(){
+            @Override
+            public ReplicatedEnvironment call() throws Exception
+            {
+                String originalThreadName = Thread.currentThread().getName();
+                try
+                {
+                    return createEnvironment(environmentPathFile, envConfig, replicationConfig);
+                }
+                finally
+                {
+                    Thread.currentThread().setName(originalThreadName);
+                }
+            }});
+
+        long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+        try
+        {
+            return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Environment creation was interrupted", e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException("Unexpected exception on environment creation", e.getCause());
+        }
+        catch (TimeoutException e)
+        {
+            throw new RuntimeException("JE environment has not been created in due time");
+        }
+    }
+
+    private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig,
+            final ReplicationConfig replicationConfig)
+    {
+        ReplicatedEnvironment environment = null;
+        try
+        {
+            environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+        }
+        catch (final InsufficientLogException ile)
+        {
+            LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+            NetworkRestore restore = new NetworkRestore();
+            NetworkRestoreConfig config = new NetworkRestoreConfig();
+            config.setRetainLogFiles(false);
+            restore.execute(ile, config);
+            environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+        }
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
+        }
+        return environment;
+    }
+
+    @Override
+    public Committer createCommitter(String name)
+    {
+        if (_coalescingSync)
+        {
+            return new CoalescingCommiter(name, this);
+        }
+        else
+        {
+            return Committer.IMMEDIATE_FUTURE_COMMITTER;
+        }
+    }
+
+    public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+    {
+        if (repNode == null)
+        {
+            throw new IllegalArgumentException("Node cannot be null");
+        }
+        return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState();
+    }
+
+    // For testing only
+    int getNumberOfElectableGroupMembers()
+    {
+        if (_state.get() != State.OPEN)
+        {
+            throw new IllegalStateException("Environment facade is not opened");
+        }
+        return _environment.getGroup().getElectableNodes().size();
+    }
+
+    private class RemoteNodeStateLearner implements Callable<Void>
+    {
+        private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap();
+        @Override
+        public Void call()
+        {
+            final Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
+            try
+            {
+                Set<Future<Void>> futures = new HashSet<Future<Void>>();
+
+                for (final ReplicationNode node : _environment.getGroup().getElectableNodes())
+                {
+                    Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
+                    {
+                        @Override
+                        public Void call()
+                        {
+                            DbPing ping = new DbPing(node, _configuration.getGroupName(), REMOTE_NODE_MONITOR_INTERVAL);
+                            ReplicatedEnvironment.State nodeState;
+                            try
+                            {
+                                nodeState = ping.getNodeState().getNodeState();
+                            }
+                            catch (IOException e)
+                            {
+                                nodeState = ReplicatedEnvironment.State.UNKNOWN;
+                            }
+                            catch (ServiceConnectFailedException e)
+                            {
+                                nodeState = ReplicatedEnvironment.State.UNKNOWN;
+                            }
+                            
+                            currentGroupState.put(node.getName(), nodeState);
+                            return null;
+                        }
+                    });
+                    futures.add(future);
+                }
+
+                for (Future<Void> future : futures)
+                {
+                    try
+                    {
+                        future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                    catch (ExecutionException e)
+                    {
+                        LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
+                    }
+                    catch (TimeoutException e)
+                    {
+                        LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
+                        future.cancel(true);
+                    }
+                }
+
+                if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+                {
+                    boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+                    _previousGroupState = currentGroupState;
+                    if (stateChanged && State.OPEN == _state.get())
+                    {
+                        new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+                    }
+                }
+            }
+            finally
+            {
+                _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+            }
+            return null;
+        }
+    }
+    public static enum State
+    {
+        OPENING,
+        OPEN,
+        RESTARTING,
+        CLOSING,
+        CLOSED
+    }
+
+    private static class DatabaseHolder
+    {
+        private final DatabaseConfig _config;
+        private Database _database;
+
+        public DatabaseHolder(DatabaseConfig config)
+        {
+            _config = config;
+        }
+
+        public Database getDatabase()
+        {
+            return _database;
+        }
+
+        public void setDatabase(Database database)
+        {
+            _database = database;
+        }
+
+        public DatabaseConfig getConfig()
+        {
+            return _config;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]";
+        }
+
+    }
+
+}

Added: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java?rev=1576697&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java (added)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java Wed Mar 12 11:28:49 2014
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+
+public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+    
+    private static final int DEFAULT_NODE_PRIORITY = 1;
+    private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
+            ReplicaAckPolicy.SIMPLE_MAJORITY);
+    private static final boolean DEFAULT_COALESCING_SYNC = true;
+
+    
+
+    @Override
+    public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore)
+    {
+        ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
+        {
+            @Override
+            public boolean isDesignatedPrimary()
+            {
+                return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false);
+            }
+
+            @Override
+            public boolean isCoalescingSync()
+            {
+                return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC);
+            }
+
+            @Override
+            public String getStorePath()
+            {
+                return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+            }
+
+            @Override
+            public Map<String, String> getParameters()
+            {
+                return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig");
+            }
+
+            @Override
+            public Map<String, String> getReplicationParameters()
+            {
+                return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig");
+            }
+
+            @Override
+            public int getQuorumOverride()
+            {
+                return 0;
+            }
+
+            @Override
+            public int getPriority()
+            {
+                return DEFAULT_NODE_PRIORITY;
+            }
+
+
+
+            @Override
+            public String getName()
+            {
+                return (String)virtualHost.getAttribute("haNodeName");
+            }
+
+            @Override
+            public String getHostPort()
+            {
+                return (String)virtualHost.getAttribute("haNodeAddress");
+            }
+
+            @Override
+            public String getHelperHostPort()
+            {
+                return (String)virtualHost.getAttribute("haHelperAddress");
+            }
+
+            @Override
+            public String getGroupName()
+            {
+                return (String)virtualHost.getAttribute("haGroupName");
+            }
+
+            @Override
+            public String getDurability()
+            {
+                return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability");
+            }
+        };
+        return new ReplicatedEnvironmentFacade(configuration);
+
+    }
+
+    @Override
+    public String getType()
+    {
+        return ReplicatedEnvironmentFacade.TYPE;
+    }
+
+    private boolean convertBoolean(final Object value, boolean defaultValue)
+    {
+        if(value instanceof Boolean)
+        {
+            return (Boolean) value;
+        }
+        else if(value instanceof String)
+        {
+            return Boolean.valueOf((String) value);
+        }
+        else if(value == null)
+        {
+            return defaultValue;
+        }
+        else
+        {
+            throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Boolean");
+        }
+    }
+
+}

Modified: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java?rev=1576697&r1=1576696&r2=1576697&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java (original)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java Wed Mar 12 11:28:49 2014
@@ -21,11 +21,13 @@
 package org.apache.qpid.server.store.berkeleydb.upgrade;
 
 import com.sleepycat.je.Cursor;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
 
 import com.sleepycat.bind.tuple.IntegerBinding;
 import com.sleepycat.bind.tuple.LongBinding;
@@ -38,6 +40,8 @@ import com.sleepycat.je.OperationStatus;
 
 public class Upgrader
 {
+    private static final Logger LOGGER = Logger.getLogger(Upgrader.class);
+
     static final String VERSION_DB_NAME = "DB_VERSION";
 
     private Environment _environment;
@@ -63,7 +67,8 @@ public class Upgrader
 
             if(versionDb.count() == 0L)
             {
-                int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
+
+                int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
                 DatabaseEntry key = new DatabaseEntry();
                 IntegerBinding.intToEntry(sourceVersion, key);
                 DatabaseEntry value = new DatabaseEntry();
@@ -73,11 +78,17 @@ public class Upgrader
             }
 
             int version = getSourceVersion(versionDb);
-            if(version > AbstractBDBMessageStore.VERSION)
+
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Source message store version is " + version);
+            }
+
+            if(version > BDBMessageStore.VERSION)
             {
                 throw new StoreException("Database version " + version
                                             + " is higher than the most recent known version: "
-                                            + AbstractBDBMessageStore.VERSION);
+                                            + BDBMessageStore.VERSION);
             }
             performUpgradeFromVersion(version, versionDb);
         }
@@ -124,8 +135,9 @@ public class Upgrader
     }
 
     void performUpgradeFromVersion(int sourceVersion, Database versionDb)
+            throws StoreException
     {
-        while(sourceVersion != AbstractBDBMessageStore.VERSION)
+        while(sourceVersion != BDBMessageStore.VERSION)
         {
             upgrade(sourceVersion, ++sourceVersion);
             DatabaseEntry key = new DatabaseEntry();
@@ -136,7 +148,7 @@ public class Upgrader
         }
     }
 
-    void upgrade(final int fromVersion, final int toVersion)
+    void upgrade(final int fromVersion, final int toVersion) throws StoreException
     {
         try
         {
@@ -177,7 +189,7 @@ public class Upgrader
 
     private int identifyOldStoreVersion() throws DatabaseException
     {
-        int version = 0;
+        int version = BDBMessageStore.VERSION;
         for (String databaseName : _environment.getDatabaseNames())
         {
             if (databaseName.contains("_v"))

Modified: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java?rev=1576697&r1=1576696&r2=1576697&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java (original)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java Wed Mar 12 11:28:49 2014
@@ -22,20 +22,14 @@ package org.apache.qpid.server.store.ber
 
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class MessageStoreCreatorTest extends QpidTestCase
 {
-    private static final String[] STORE_TYPES = {BDBMessageStore.TYPE};
-
     public void testMessageStoreCreator()
     {
         MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
-        for (String type : STORE_TYPES)
-        {
-            MessageStore store = messageStoreCreator.createMessageStore(type);
-            assertNotNull("Store of type " + type + " is not created", store);
-        }
-    }
-}
+        String type = new BDBMessageStoreFactory().getType();
+        MessageStore store = messageStoreCreator.createMessageStore(type);
+        assertNotNull("Store of type " + type + " is not created", store);
+    }}

Added: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?rev=1576697&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (added)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Wed Mar 12 11:28:49 2014
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public class StandardEnvironmentFacadeTest extends QpidTestCase
+{
+    protected File _storePath;
+    protected EnvironmentFacade _environmentFacade;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName());
+    }
+
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+            if (_environmentFacade != null)
+            {
+                _environmentFacade.close();
+            }
+        }
+        finally
+        {
+            if (_storePath != null)
+            {
+                FileUtils.delete(_storePath, true);
+            }
+        }
+    }
+
+    public void testEnvironmentFacade() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        assertNotNull("Environment should not be null", ef);
+        Environment e = ef.getEnvironment();
+        assertTrue("Environment is not valid", e.isValid());
+    }
+
+    public void testClose() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        ef.close();
+        Environment e = ef.getEnvironment();
+
+        assertNull("Environment should be null after facade close", e);
+    }
+
+    public void testOpenDatabases() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(dbConfig, "test1", "test2");
+        Database test1 = ef.getOpenDatabase("test1");
+        Database test2 = ef.getOpenDatabase("test2");
+
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+    }
+
+    public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+    {
+        EnvironmentFacade ef = getEnvironmentFacade();
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setTransactional(true);
+        dbConfig.setAllowCreate(true);
+        ef.openDatabases(dbConfig, "test1");
+        Database test1 = ef.getOpenDatabase("test1");
+        assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+        try
+        {
+            ef.getOpenDatabase("test2");
+            fail("An exception should be thrown for the non existing database");
+        }
+        catch(IllegalArgumentException e)
+        {
+            assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+        }
+    }
+
+    EnvironmentFacade getEnvironmentFacade() throws Exception
+    {
+        if (_environmentFacade == null)
+        {
+            _environmentFacade = createEnvironmentFacade();
+        }
+        return _environmentFacade;
+    }
+
+    EnvironmentFacade createEnvironmentFacade()
+    {
+        return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+    }
+
+}

Added: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java?rev=1576697&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java (added)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java Wed Mar 12 11:28:49 2014
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class VirtualHostTest extends QpidTestCase
+{
+
+    private Broker _broker;
+    private StatisticsGatherer _statisticsGatherer;
+    private RecovererProvider _recovererProvider;
+    private File _configFile;
+    private File _bdbStorePath;
+    private VirtualHost _host;
+    private ConfigurationEntryStore _store;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        _store = mock(ConfigurationEntryStore.class);
+        _broker = BrokerTestHelper.createBrokerMock();
+        TaskExecutor taslExecutor = mock(TaskExecutor.class);
+        when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
+        when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
+
+
+        _statisticsGatherer = mock(StatisticsGatherer.class);
+
+        _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis());
+        _bdbStorePath.deleteOnExit();
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            if (_host != null)
+            {
+                _host.setDesiredState(_host.getState(), State.STOPPED);
+            }
+        }
+        finally
+        {
+            if (_configFile != null)
+            {
+                _configFile.delete();
+            }
+            if (_bdbStorePath != null)
+            {
+                FileUtils.delete(_bdbStorePath, true);
+            }
+            super.tearDown();
+        }
+    }
+
+
+    public void testCreateBdbVirtualHostFromConfigurationFile()
+    {
+        String hostName = getName();
+        long logFileMax = 2000000;
+        _host = createHostFromConfiguration(hostName, logFileMax);
+        _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+        assertEquals("Unexpected host name", hostName, _host.getName());
+        assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
+        assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE));
+        assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+        BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+        EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig();
+        assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+    }
+
+    public void testCreateBdbHaVirtualHostFromConfigurationFile()
+    {
+        String hostName = getName();
+
+        String repStreamTimeout = "2 h";
+        String nodeName = "node";
+        String groupName = "group";
+        String nodeHostPort = "localhost:" + findFreePort();
+        String helperHostPort = nodeHostPort;
+        String durability = "NO_SYNC,SYNC,NONE";
+        _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout);
+        _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+        assertEquals("Unexpected host name", hostName, _host.getName());
+        assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
+        assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+        assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+        BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+        ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
+        ReplicationConfig repConfig = environment.getRepConfig();
+        assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME));
+        assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME));
+        assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT));
+        assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS));
+        assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY));
+        assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+    }
+
+    private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children)
+    {
+        ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
+                children, _store);
+
+        return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
+    }
+
+    private VirtualHost createHost(Map<String, Object> attributes)
+    {
+        return createHost(attributes, Collections.<UUID> emptySet());
+    }
+
+    private VirtualHost createHostFromConfiguration(String hostName, long logFileMax)
+    {
+        String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+                        + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+                        + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+                        + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>"
+                        + "</store>"
+                        + "</" + hostName + "></virtualhost></virtualhosts>";
+        Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+        return createHost(attributes);
+    }
+
+
+    private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout)
+    {
+        String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+                        + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>"
+                        + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+                        + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+                        + "<highAvailability>"
+                        + "<groupName>" + groupName + "</groupName>"
+                        + "<nodeName>" + nodeName + "</nodeName>"
+                        + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>"
+                        + "<helperHostPort>" + helperHostPort + "</helperHostPort>"
+                        + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>"
+                        + "</highAvailability>"
+                        + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>"
+                        + "</store>"
+                        + "</" + hostName + "></virtualhost></virtualhosts>";
+        Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+        return createHost(attributes);
+    }
+
+    private Map<String, Object> writeConfigAndGenerateAttributes(String content)
+    {
+        _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content);
+        Map<String, Object> attributes = new HashMap<String, Object>();
+        attributes.put(VirtualHost.NAME, getName());
+        attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath());
+        return attributes;
+    }
+}
+
+    
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message