qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1342169 - /qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
Date Thu, 24 May 2012 08:21:29 GMT
Author: kwall
Date: Thu May 24 08:21:29 2012
New Revision: 1342169

URL: http://svn.apache.org/viewvc?rev=1342169&view=rev
Log:
QPID-4006: Remove BDB StateChangeListener during Environment#close() to avoid processing an
unnecessary DETACHED event (which was causing a stack trace).

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java?rev=1342169&r1=1342168&r2=1342169&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
Thu May 24 08:21:29 2012
@@ -66,11 +66,14 @@ import com.sleepycat.je.rep.util.Replica
 
 public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
 {
+    private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
+
     private static final String MUTLI_SYNC = "MUTLI_SYNC";
     private static final String DEFAULT_REPLICATION_POLICY =
         MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
 
-    private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
+    public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+    public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
 
     private String _groupName;
     private String _nodeName;
@@ -83,10 +86,6 @@ public class BDBHAMessageStore extends A
 
     private BDBHAMessageStoreManagerMBean _managedObject;
 
-    public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
-
-    public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
-
     private CommitThreadWrapper _commitThreadWrapper;
     private boolean _localMultiSyncCommits;
     private boolean _autoDesignatedPrimary;
@@ -137,16 +136,6 @@ public class BDBHAMessageStore extends A
         }
     }
 
-    private String getValidatedPropertyFromConfig(String key, Configuration config) throws
ConfigurationException
-    {
-        if (!config.containsKey(key))
-        {
-            throw new ConfigurationException("BDB HA configuration key not found. Please
specify configuration key with XPath: "
-                                                + key.replace('.', '/'));
-        }
-        return config.getString(key);
-    }
-
     @Override
     protected Environment createEnvironment(File environmentPath) throws DatabaseException
     {
@@ -199,50 +188,29 @@ public class BDBHAMessageStore extends A
     }
 
     @Override
-    public synchronized void passivate()
+    public synchronized void activate() throws Exception
     {
-        if (_stateManager.isNotInState(State.INITIALISED))
-        {
-            LOGGER.debug("Store becoming passive");
-            _stateManager.attainState(State.INITIALISED);
-        }
-    }
+        // Before proceeding, perform a log flush with an fsync
+        getEnvironment().flushLog(true);
 
-    @Override
-    protected void closeInternal() throws Exception
-    {
-        try
-        {
-            if(_localMultiSyncCommits)
-            {
-                _commitThreadWrapper.stopCommitThread();
-            }
-            super.closeInternal();
-        }
-        finally
+        super.activate();
+
+        //For replica groups with 2 electable nodes, set the new master to be the
+        //designated primary, such that it can continue working if the replica goes
+        //down and leaves it without a 'majority of 2'.
+        if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 &&
_autoDesignatedPrimary)
         {
-            if (_managedObject != null)
-            {
-                _managedObject.unregister();
-            }
+            setDesignatedPrimary(true);
         }
     }
 
     @Override
-    protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
+    public synchronized void passivate()
     {
-        // Using commit() instead of commitNoSync() for the HA store to allow
-        // the HA durability configuration to influence resulting behaviour.
-        tx.commit();
-
-
-        if(_localMultiSyncCommits)
-        {
-            return _commitThreadWrapper.commit(tx, syncCommit);
-        }
-        else
+        if (_stateManager.isNotInState(State.INITIALISED))
         {
-            return StoreFuture.IMMEDIATE_FUTURE;
+            LOGGER.debug("Store becoming passive");
+            _stateManager.attainState(State.INITIALISED);
         }
     }
 
@@ -330,7 +298,10 @@ public class BDBHAMessageStore extends A
                 replicatedEnvironment.setRepMutableConfig(newConfig);
             }
 
-            LOGGER.info("Node " + _nodeName + " successfully set as designated primary for
group");
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Node " + _nodeName + " successfully set as designated primary
for group");
+            }
         }
         catch (DatabaseException e)
         {
@@ -361,6 +332,61 @@ public class BDBHAMessageStore extends A
         }
     }
 
+    @Override
+    protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
+    {
+        // Using commit() instead of commitNoSync() for the HA store to allow
+        // the HA durability configuration to influence resulting behaviour.
+        tx.commit();
+
+        if(_localMultiSyncCommits)
+        {
+            return _commitThreadWrapper.commit(tx, syncCommit);
+        }
+        else
+        {
+            return StoreFuture.IMMEDIATE_FUTURE;
+        }
+    }
+
+    @Override
+    protected void closeInternal() throws Exception
+    {
+        try
+        {
+            substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
+
+            try
+            {
+                if(_localMultiSyncCommits)
+                {
+                    _commitThreadWrapper.stopCommitThread();
+                }
+            }
+            finally
+            {
+                super.closeInternal();
+            }
+        }
+        finally
+        {
+            if (_managedObject != null)
+            {
+                _managedObject.unregister();
+            }
+        }
+    }
+
+    /**
+     * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED}
during
+     * {@link Environment#close()}.  We replace the StateChangeListener so we silently ignore
this state change.
+     */
+    private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment)
+    {
+        LOGGER.debug("Substituting no-op state change listener for environment close");
+        replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener());
+    }
+
     private ReplicationGroupAdmin createReplicationGroupAdmin()
     {
         final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
@@ -372,6 +398,29 @@ public class BDBHAMessageStore extends A
         return new ReplicationGroupAdmin(_groupName, helpers);
     }
 
+
+    private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
+    {
+        for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+        {
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to
'" + configItem.getValue() + "'");
+            }
+            replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+        }
+    }
+
+    private String getValidatedPropertyFromConfig(String key, Configuration config) throws
ConfigurationException
+    {
+        if (!config.containsKey(key))
+        {
+            throw new ConfigurationException("BDB HA configuration key not found. Please
specify configuration key with XPath: "
+                                                + key.replace('.', '/'));
+        }
+        return config.getString(key);
+    }
+
     private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
     {
         private final Executor _executor = Executors.newSingleThreadExecutor();
@@ -381,7 +430,10 @@ public class BDBHAMessageStore extends A
         {
             com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
 
-            LOGGER.info("Received BDB event indicating transition to state " + state);
+            if (LOGGER.isInfoEnabled())
+            {
+                LOGGER.info("Received BDB event indicating transition to state " + state);
+            }
 
             switch (state)
             {
@@ -508,29 +560,12 @@ public class BDBHAMessageStore extends A
         }
     }
 
-    @Override
-    public synchronized void activate() throws Exception
-    {
-        // Before proceeding, perform a log flush with an fsync
-        getEnvironment().flushLog(true);
-
-        super.activate();
-
-        //For replica groups with 2 electable nodes, set the new master to be the
-        //designated primary, such that it can continue working if the replica goes
-        //down and leaves it without a 'majority of 2'.
-        if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 &&
_autoDesignatedPrimary)
-        {
-            setDesignatedPrimary(true);
-        }
-    }
-
-    private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
+    private class NoOpStateChangeListener implements StateChangeListener
     {
-        for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+        @Override
+        public void stateChange(StateChangeEvent stateChangeEvent)
+                throws RuntimeException
         {
-            LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '"
+ configItem.getValue() + "'");
-            replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
         }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message