qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1576697 [1/4] - in /qpid/branches/java-broker-bdb-ha2/qpid/java: bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/ bdbstore/src/main/java/org/apa...
Date Wed, 12 Mar 2014 11:28:50 GMT
Author: kwall
Date: Wed Mar 12 11:28:49 2014
New Revision: 1576697

URL: http://svn.apache.org/r1576697
Log:
QPID-5410: [Java Broker/BDB]. Introduce a thin facade (EnvironmentFacade) between the BDBMessage
and BDB JE's Environment/ReplicatedEnvironment. The motivation behind this facade is principally
HA; there are a number of cases where JE requires the ReplicatedEnvironment is recreated.
The facade layer allows for this to be done transparently from the upper tiers (the BDBMessageStore).
The facade has two implementations StandardFacade used in the non-HA use case, and ReplicatedEnvironmentFacade
in the HA case.

Key changes:

* BDBHAVirtualHost is now responsible for the creation of ReplicatedEnvironmentFacade
* BDBMessageStore reverts to a single implementation without knowledge of HA.
* BDBMessageStore now interacts with JE via the facade.
* BDBHAMessageStoreManagerMBean interrogates the facade
* ReplicatedEnvironmentFacade monitors the group for changes in state (nodes becoming uncontactable
etc), if such a state change is detected, the DatabasePinger
fires a single transaction to determine if quorum still exists.  If quorum does not exist,
the environment is restarted, thus transition the environment into
 the UNKNOWN state.

Added:
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
      - copied, changed from r1576683, qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
      - copied, changed from r1576683, qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java
      - copied, changed from r1576683, qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
      - copied, changed from r1576683, qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java
      - copied, changed from r1576683, qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
Removed:
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
Modified:
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
    qpid/branches/java-broker-bdb-ha2/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java

Modified: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1576697&r1=1576696&r2=1576697&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
(original)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
Wed Mar 12 11:28:49 2014
@@ -36,19 +36,12 @@ import javax.management.openmbean.Tabula
 import javax.management.openmbean.TabularType;
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.jmx.AMQManagedObject;
 import org.apache.qpid.server.jmx.ManagedObject;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 
 /**
  * Management mbean for BDB HA.
- * <p>
- * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX
classes. This is
- * currently arranged through OSGI using the <b>fragment</b> feature so that
this bundle shares the
- * same classloader as broker-plugins-management-jmx.  See the <b>Fragment-Host:</b>
header within the MANIFEST.MF
- * of this bundle.
- * </p>
  */
 public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
 {
@@ -63,7 +56,7 @@ public class BDBHAMessageStoreManagerMBe
         try
         {
             GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
-            final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME,
BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+            final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME,
ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT};
             final String[] itemDescriptions = new String[] {"Unique node name", "Node host
/ port "};
             GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
                                                 itemNames,
@@ -71,7 +64,7 @@ public class BDBHAMessageStoreManagerMBe
                                                 GROUP_MEMBER_ATTRIBUTE_TYPES );
             GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
                                                 GROUP_MEMBER_ROW,
-                                                new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+                                                new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME});
         }
         catch (final OpenDataException ode)
         {
@@ -79,44 +72,46 @@ public class BDBHAMessageStoreManagerMBe
         }
     }
 
-    private final BDBHAMessageStore _store;
+    private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+    private final String _objectName;
 
-    protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent)
throws JMException
+    protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade
replicatedEnvironmentFacade, ManagedObject parent) throws JMException
     {
         super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
-        LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
-        _store = store;
+        LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName);
+        _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+        _objectName = ObjectName.quote(virtualHostName);
         register();
     }
 
     @Override
     public String getObjectInstanceName()
     {
-        return ObjectName.quote(_store.getName());
+        return _objectName;
     }
 
     @Override
     public String getGroupName()
     {
-        return _store.getGroupName();
+        return _replicatedEnvironmentFacade.getGroupName();
     }
 
     @Override
     public String getNodeName()
     {
-        return _store.getNodeName();
+        return _replicatedEnvironmentFacade.getNodeName();
     }
 
     @Override
     public String getNodeHostPort()
     {
-        return _store.getNodeHostPort();
+        return _replicatedEnvironmentFacade.getHostPort();
     }
 
     @Override
     public String getHelperHostPort()
     {
-        return _store.getHelperHostPort();
+        return _replicatedEnvironmentFacade.getHelperHostPort();
     }
 
     @Override
@@ -124,7 +119,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            return _store.getDurability();
+            return _replicatedEnvironmentFacade.getDurability();
         }
         catch (RuntimeException e)
         {
@@ -137,7 +132,7 @@ public class BDBHAMessageStoreManagerMBe
     @Override
     public boolean getCoalescingSync() throws IOException, JMException
     {
-        return _store.isCoalescingSync();
+        return _replicatedEnvironmentFacade.isCoalescingSync();
     }
 
     @Override
@@ -145,7 +140,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            return _store.getNodeState();
+            return _replicatedEnvironmentFacade.getNodeState();
         }
         catch (RuntimeException e)
         {
@@ -159,7 +154,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            return _store.isDesignatedPrimary();
+            return _replicatedEnvironmentFacade.isDesignatedPrimary();
         }
         catch (RuntimeException e)
         {
@@ -172,7 +167,7 @@ public class BDBHAMessageStoreManagerMBe
     public TabularData getAllNodesInGroup() throws IOException, JMException
     {
         final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
-        final List<Map<String, String>> members = _store.getGroupMembers();
+        final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers();
 
         for (Map<String, String> map : members)
         {
@@ -187,9 +182,9 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            _store.removeNodeFromGroup(nodeName);
+            _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
         }
-        catch (StoreException e)
+        catch (RuntimeException e)
         {
             LOGGER.error("Failed to remove node " + nodeName + " from group", e);
             throw new JMException(e.getMessage());
@@ -201,11 +196,11 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            _store.setDesignatedPrimary(primary);
+            _replicatedEnvironmentFacade.setDesignatedPrimary(primary);
         }
-        catch (StoreException e)
+        catch (RuntimeException e)
         {
-            LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary",
e);
+            LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName()
+ " as designated primary", e);
             throw new JMException(e.getMessage());
         }
     }
@@ -215,9 +210,9 @@ public class BDBHAMessageStoreManagerMBe
     {
         try
         {
-            _store.updateAddress(nodeName, newHostName, newPort);
+            _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort);
         }
-        catch(StoreException e)
+        catch(RuntimeException e)
         {
             LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName
+ ":" + newPort, e);
             throw new JMException(e.getMessage());

Modified: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java?rev=1576697&r1=1576696&r2=1576697&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
(original)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
Wed Mar 12 11:28:49 2014
@@ -28,11 +28,12 @@ import org.apache.qpid.server.jmx.MBeanP
 import org.apache.qpid.server.jmx.ManagedObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 
 /**
  * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
- * host and of type {@link BDBHAMessageStore#TYPE}.
+ * host and of type {@link ReplicatedEnvironmentFacade#TYPE}.
  *
  */
 public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@@ -48,7 +49,7 @@ public class BDBHAMessageStoreManagerMBe
     public boolean isChildManageableByMBean(ConfiguredObject child)
     {
         return (child instanceof VirtualHost
-            && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+            && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
     }
 
     @Override
@@ -56,14 +57,15 @@ public class BDBHAMessageStoreManagerMBe
     {
         VirtualHost virtualHostChild = (VirtualHost) child;
 
-        BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore();
+        BDBMessageStore messageStore = (BDBMessageStore) virtualHostChild.getMessageStore();
 
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Creating mBean for child " + child);
         }
 
-        return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent);
+        ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
+        return new BDBHAMessageStoreManagerMBean(virtualHostChild.getName(), replicatedEnvironmentFacade,
(ManagedObject) parent);
     }
 
     @Override

Modified: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1576697&r1=1576696&r2=1576697&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
(original)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
Wed Mar 12 11:28:49 2014
@@ -37,10 +37,9 @@ import javax.management.openmbean.Tabula
 
 import junit.framework.TestCase;
 
-import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.jmx.AMQManagedObject;
 import org.apache.qpid.server.jmx.ManagedObjectRegistry;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 
 public class BDBHAMessageStoreManagerMBeanTest extends TestCase
 {
@@ -53,7 +52,7 @@ public class BDBHAMessageStoreManagerMBe
     private static final String TEST_STORE_NAME = "testStoreName";
     private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
 
-    private BDBHAMessageStore _store;
+    private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
     private BDBHAMessageStoreManagerMBean _mBean;
     private AMQManagedObject _mBeanParent;
 
@@ -62,10 +61,10 @@ public class BDBHAMessageStoreManagerMBe
     {
         super.setUp();
 
-        _store = mock(BDBHAMessageStore.class);
+        _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class);
         _mBeanParent = mock(AMQManagedObject.class);
         when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
-        _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent);
+        _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacade,
_mBeanParent);
     }
 
     @Override
@@ -76,64 +75,62 @@ public class BDBHAMessageStoreManagerMBe
 
     public void testObjectName() throws Exception
     {
-        when(_store.getName()).thenReturn(TEST_STORE_NAME);
-
         String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
         assertEquals(expectedObjectName, _mBean.getObjectName().toString());
     }
 
     public void testGroupName() throws Exception
     {
-        when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+        when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(TEST_GROUP_NAME);
 
         assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
     }
 
     public void testNodeName() throws Exception
     {
-        when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+        when(_replicatedEnvironmentFacade.getNodeName()).thenReturn(TEST_NODE_NAME);
 
         assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
     }
 
     public void testNodeHostPort() throws Exception
     {
-        when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+        when(_replicatedEnvironmentFacade.getHostPort()).thenReturn(TEST_NODE_HOST_PORT);
 
         assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
     }
 
     public void testHelperHostPort() throws Exception
     {
-        when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+        when(_replicatedEnvironmentFacade.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
 
         assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
     }
 
     public void testDurability() throws Exception
     {
-        when(_store.getDurability()).thenReturn(TEST_DURABILITY);
+        when(_replicatedEnvironmentFacade.getDurability()).thenReturn(TEST_DURABILITY);
 
         assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
     }
 
     public void testCoalescingSync() throws Exception
     {
-        when(_store.isCoalescingSync()).thenReturn(true);
+        when(_replicatedEnvironmentFacade.isCoalescingSync()).thenReturn(true);
 
         assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
     }
 
     public void testNodeState() throws Exception
     {
-        when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+        when(_replicatedEnvironmentFacade.getNodeState()).thenReturn(TEST_NODE_STATE);
 
         assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
     }
 
     public void testDesignatedPrimaryFlag() throws Exception
     {
-        when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+        when(_replicatedEnvironmentFacade.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
 
         assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
     }
@@ -141,29 +138,29 @@ public class BDBHAMessageStoreManagerMBe
     public void testGroupMembersForGroupWithOneNode() throws Exception
     {
         List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
-        when(_store.getGroupMembers()).thenReturn(members);
+        when(_replicatedEnvironmentFacade.getGroupMembers()).thenReturn(members);
 
         final TabularData resultsTable = _mBean.getAllNodesInGroup();
 
-        assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME,
BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+        assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME,
ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT);
 
         final int numberOfDataRows = resultsTable.size();
         assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
         final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
-        assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
-        assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+        assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME));
+        assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
     }
 
     public void testRemoveNodeFromReplicationGroup() throws Exception
     {
         _mBean.removeNodeFromGroup(TEST_NODE_NAME);
 
-        verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+        verify(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
     }
 
     public void testRemoveNodeFromReplicationGroupWithError() throws Exception
     {
-        doThrow(new StoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+        doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
 
         try
         {
@@ -180,12 +177,12 @@ public class BDBHAMessageStoreManagerMBe
     {
         _mBean.setDesignatedPrimary(true);
 
-        verify(_store).setDesignatedPrimary(true);
+        verify(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
     }
 
     public void testSetAsDesignatedPrimaryWithError() throws Exception
     {
-        doThrow(new StoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+        doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
 
         try
         {
@@ -205,7 +202,7 @@ public class BDBHAMessageStoreManagerMBe
 
         _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
 
-        verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+        verify(_replicatedEnvironmentFacade).updateAddress(TEST_NODE_NAME, newHostName, newPort);
     }
 
     private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
@@ -220,8 +217,8 @@ public class BDBHAMessageStoreManagerMBe
     private Map<String, String> createTestNodeResult()
     {
         Map<String, String> items = new HashMap<String, String>();
-        items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
-        items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+        items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+        items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
         return items;
     }
 }

Modified: qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1576697&r1=1576696&r2=1576697&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
(original)
+++ qpid/branches/java-broker-bdb-ha2/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
Wed Mar 12 11:28:49 2014
@@ -20,6 +20,7 @@ package org.apache.qpid.server.store.ber
  *
  */
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
@@ -31,15 +32,22 @@ import org.apache.qpid.server.store.Even
 import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.OperationalLoggingListener;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
 import org.apache.qpid.server.virtualhost.State;
 import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
 public class BDBHAVirtualHost extends AbstractVirtualHost
 {
-    private BDBHAMessageStore _messageStore;
+    private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class);
+
+    private BDBMessageStore _messageStore;
 
     private boolean _inVhostInitiatedClose;
 
@@ -52,11 +60,9 @@ public class BDBHAVirtualHost extends Ab
         super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig,
virtualHost);
     }
 
-
-
     protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost)
     {
-        _messageStore = new BDBHAMessageStore();
+        _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
 
         final MessageStoreLogSubject storeLogSubject =
                 new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
@@ -84,6 +90,11 @@ public class BDBHAVirtualHost extends Ab
                 virtualHost, recoveryHandler,
                 recoveryHandler
         );
+
+        // Make the virtualhost model object a replication group listener
+        ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade();
+        environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+
     }
 
 
@@ -194,4 +205,70 @@ public class BDBHAVirtualHost extends Ab
         }
     }
 
+    private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+    {
+
+        @Override
+        public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+        {
+            com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Received BDB event indicating transition to state " + state
+                        + " when current message store state is " + _messageStore._stateManager.getState());
+            }
+
+            switch (state)
+            {
+            case MASTER:
+                activate();
+                break;
+            case REPLICA:
+                passivate();
+                break;
+            case DETACHED:
+                LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+                passivate();
+                break;
+            case UNKNOWN:
+                LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+                break;
+            default:
+                LOGGER.error("Unexpected state change: " + state);
+                throw new IllegalStateException("Unexpected state change: " + state);
+            }
+        }
+
+        private void activate()
+        {
+            try
+            {
+                _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+                _messageStore.activate();
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to activate on hearing MASTER change event", e);
+            }
+        }
+
+        private void passivate()
+        {
+            try
+            {
+                //TODO: move this this into the store method passivate()
+                if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
+                {
+                    _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
+                }
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",
e);
+            }
+        }
+
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message