qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject svn commit: r1726645 - in /qpid/java/branches/6.0.x: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/
Date Mon, 25 Jan 2016 15:23:42 GMT
Author: orudyy
Date: Mon Jan 25 15:23:42 2016
New Revision: 1726645

URL: http://svn.apache.org/viewvc?rev=1726645&view=rev
Log:
QPID-6997: [Java Broker] BDB JE HA: Ensure that the VH is closed before closing the replicated
environment
    
    Ensure that the VH is closed *before* starting to restart the environment. Closing the
virtualhost will not
    return until connections are closed, and closing connections is guaranteed to rollback
in flight transactions.
    This will ensure that we comply with JE's business rule that transactions/cursors must
be closed before
    closing the environment.

    merged from trunk
    svn merge -c 1726358 https://svn.apache.org/repos/asf/qpid/java/trunk
    svn merge -c 1726436 https://svn.apache.org/repos/asf/qpid/java/trunk

Modified:
    qpid/java/branches/6.0.x/   (props changed)
    qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 25 15:23:42 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk
+/qpid/java/trunk
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1726645&r1=1726644&r2=1726645&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++ qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Mon Jan 25 15:23:42 2016
@@ -47,12 +47,13 @@ import java.util.concurrent.atomic.Atomi
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.sleepycat.je.CacheMode;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Durability;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.sleepycat.je.Durability.ReplicaAckPolicy;
 import com.sleepycat.je.Durability.SyncPolicy;
 import com.sleepycat.je.EnvironmentConfig;
@@ -196,7 +197,7 @@ public class ReplicatedEnvironmentFacade
     private final File _environmentDirectory;
 
     private final ExecutorService _environmentJobExecutor;
-    private final ExecutorService _stateChangeExecutor;
+    private final ListeningExecutorService _stateChangeExecutor;
     private final ScheduledExecutorService _groupChangeExecutor;
     private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
     private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes =
new ConcurrentHashMap<String, ReplicationNode>();
@@ -247,7 +248,7 @@ public class ReplicatedEnvironmentFacade
 
         // we relay on this executor being single-threaded as we need to restart and mutate
the environment in one thread
         _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-"
+ _prettyGroupNodeName));
-        _stateChangeExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("StateChange-"
+ _prettyGroupNodeName));
+        _stateChangeExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
         _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()
+ 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
 
         // create environment in a separate thread to avoid renaming of the current thread
by JE
@@ -473,10 +474,34 @@ public class ReplicatedEnvironmentFacade
         {
             if (dbe != null && LOGGER.isDebugEnabled())
             {
-                LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(),
dbe);
+                LOGGER.debug("Environment restarting due to exception {}", dbe.getMessage(),
dbe);
             }
 
-            _environmentJobExecutor.execute(new Runnable()
+            // Tell the virtualhostnode that we are no longer attached to the group.  It
will close the virtualhost,
+            // closing the connections, housekeeping etc meaning all transactions are finished
before we
+            // restart the environment.
+            _stateChangeExecutor.submit(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    StateChangeListener listener = _stateChangeListener.get();
+                    if (listener != null && _state.get() == State.RESTARTING)
+                    {
+                        try
+                        {
+                            StateChangeEvent detached = new StateChangeEvent(ReplicatedEnvironment.State.DETACHED,
NameIdPair.NULL);
+                            listener.stateChange(detached);
+                        }
+                        catch (Throwable t)
+                        {
+                            handleUncaughtExceptionInExecutorService(t);
+                        }
+                    }
+
+                    return null;
+                }
+            }).addListener(new Runnable()
             {
                 @Override
                 public void run()
@@ -495,7 +520,7 @@ public class ReplicatedEnvironmentFacade
                         catch(EnvironmentFailureException e)
                         {
                             LOGGER.warn("Failure whilst trying to restart environment (attempt
number "
-                                    + attemptNumber + " of " + _environmentRestartRetryLimit
+ ")", e);
+                                    + "{} of {})", attemptNumber, _environmentRestartRetryLimit,
e);
                             lastException = e;
                         }
                         catch (Exception e)
@@ -516,11 +541,15 @@ public class ReplicatedEnvironmentFacade
                         }
                     }
                 }
-            });
+            }, _environmentJobExecutor);
+        }
+        else if (_state.equals(State.RESTARTING))
+        {
+            LOGGER.debug("Environment restart already in progress, ignoring restart request.");
         }
         else
         {
-            LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+            LOGGER.debug("Ignoring restart because the environment because state is {}",
_state.get());
         }
     }
 
@@ -1238,28 +1267,6 @@ public class ReplicatedEnvironmentFacade
     {
         LOGGER.info("Restarting environment");
 
-        StateChangeListener stateChangeListener = _stateChangeListener.get();
-
-        if (stateChangeListener != null)
-        {
-            _stateChangeExecutor.submit(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        StateChangeEvent detached = new StateChangeEvent(ReplicatedEnvironment.State.DETACHED,
NameIdPair.NULL);
-                        stateChanged(detached);
-                    }
-                    catch (Throwable e)
-                    {
-                        handleUncaughtExceptionInExecutorService(e);
-                    }
-                }
-            });
-        }
-
         closeEnvironmentOnRestart();
 
         createEnvironment(false, null);

Modified: qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java?rev=1726645&r1=1726644&r2=1726645&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
(original)
+++ qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
Mon Jan 25 15:23:42 2016
@@ -49,6 +49,7 @@ import org.apache.qpid.jms.ConnectionLis
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 import org.apache.qpid.test.utils.BrokerHolder;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -139,8 +140,6 @@ public class MultiNodeTest extends QpidB
     {
         final Connection connection = getConnection(_negativeFailoverUrl);
 
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
-
         Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
 
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
@@ -176,6 +175,58 @@ public class MultiNodeTest extends QpidB
         }
     }
 
+    /**
+     * JE requires that transactions are ended before the ReplicatedEnvironment is closed.
 This
+     * test ensures that open messaging transactions are correctly rolled-back as quorum
is lost,
+     * and later the node rejoins the group in either master or replica role.
+     */
+    public void testQuorumLostAndRestored_OriginalMasterRejoinsTheGroup() throws Exception
+    {
+        final Connection connection = getConnection(_positiveFailoverUrl);
+
+        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+
+        Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+
+        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+        ports.remove(activeBrokerPort);
+
+        Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        Destination dest = session1.createQueue(getTestQueueName());
+        session1.createConsumer(dest).close();
+
+        MessageProducer producer1 = session1.createProducer(dest);
+        producer1.send(session1.createMessage());
+        MessageProducer producer2 = session2.createProducer(dest);
+        producer2.send(session2.createMessage());
+
+        // Leave transactions open, this will leave two store transactions open on the store
+
+        // Stop all other nodes
+        for (Integer p : ports)
+        {
+            _groupCreator.stopNode(p);
+        }
+
+        // Await the old master discovering that it is all alone
+        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "WAITING");
+
+        // Restart all other nodes
+        for (Integer p : ports)
+        {
+            _groupCreator.startNode(p);
+        }
+
+        _failoverListener.awaitFailoverCompletion(20000);
+
+        Map<String, Object> attrs = _groupCreator.getNodeAttributes(activeBrokerPort);
+        String roleInGroup = (String) attrs.get(BDBHARemoteReplicationNode.ROLE);
+        assertTrue("Original master should have rejoined the group as either MASTER or REPLICA,
but " + roleInGroup,
+                   "MASTER".equals(roleInGroup) || "REPLICA".equals(roleInGroup));
+    }
+
     public void testPersistentMessagesAvailableAfterFailover() throws Exception
     {
         final Connection connection = getConnection(_positiveFailoverUrl);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message