qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject qpid-jms-amqp-0-x git commit: QPID-8222: [JMS AMQP 0-x][AMQP 0-8..0-91] Fix cleaning of dispatcher queue from failover thread
Date Fri, 10 Aug 2018 16:50:14 GMT
Repository: qpid-jms-amqp-0-x
Updated Branches:
  refs/heads/master a33ed8491 -> 2f020453e


QPID-8222: [JMS AMQP 0-x][AMQP 0-8..0-91] Fix cleaning of dispatcher queue from failover thread


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/2f020453
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/2f020453
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/2f020453

Branch: refs/heads/master
Commit: 2f020453ea0df0ff74dd51687beeaae93392d0bf
Parents: a33ed84
Author: Alex Rudyy <orudyy@apache.org>
Authored: Thu Aug 9 18:55:20 2018 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Fri Aug 10 17:49:27 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/client/AMQSession.java | 16 +++++------
 .../org/apache/qpid/client/AMQSession_0_8.java  | 11 ++++++-
 .../connection/FailoverBehaviourTest.java       | 30 ++++++++++++++++++++
 3 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/2f020453/client/src/main/java/org/apache/qpid/client/AMQSession.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ffb01d8..ffcbcac 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2417,17 +2417,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer,
P extends Basic
         // return the first <total number of msgs received on session>
         // messages sent by the brokers following the first rollback
         // after failover
-        _highestDeliveryTag.set(-1);
+        resetRollbackMarkers();
 
         _unacknowledgedMessageTags.clear();
         _prefetchedMessageTags.clear();
 
-        _rollbackMark.set(-1);
         clearResolvedDestinations();
         resubscribeProducers();
         resubscribeConsumers();
     }
 
+    void resetRollbackMarkers()
+    {
+        _highestDeliveryTag.set(-1);
+        _rollbackMark.set(-1);
+    }
+
     void setHasMessageListeners()
     {
         _hasMessageListeners = true;
@@ -3594,7 +3599,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer,
P extends Basic
 
                 try
                 {
-                    while (connectionStopped())
+                    while (!getAMQConnection().isFailingOver() && connectionStopped())
                     {
                         _lock.wait();
                     }
@@ -3870,11 +3875,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer,
P extends Basic
         }
     }
 
-    protected void clearDispatchQueue()
-    {
-        _queue.clear();
-    }
-
     private void shutdownFlowControlNoAckTaskPool()
     {
         if (_flowControlNoAckTaskPool != null)

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/2f020453/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 6c7738a..a689840 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -176,7 +176,16 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8,
BasicMe
     @Override
     void resubscribe() throws QpidException
     {
-        clearDispatchQueue();
+        try
+        {
+            setUsingDispatcherForCleanup(true);
+            resetRollbackMarkers();
+            syncDispatchQueue(true);
+        }
+        finally
+        {
+            setUsingDispatcherForCleanup(false);
+        }
 
         getDeliveredMessageTags().clear();
         super.resubscribe();

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/2f020453/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java
b/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java
index 9601de6..39d8b2e 100644
--- a/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java
@@ -560,6 +560,36 @@ public class FailoverBehaviourTest extends JmsTestBase implements ExceptionListe
         doFailoverWhilstPublishingInFlight();
     }
 
+    @Test
+    public void testFailoverWithDirtyStoppedTransactionSessionHavingPrefetchedMessages()
throws Exception
+    {
+        assumeTrue(getBrokerAdmin().supportsPersistence());
+        init(Session.SESSION_TRANSACTED, true);
+
+        produceMessages();
+        _producerSession.commit();
+
+        Message message = _consumer.receive(getReceiveTimeout());
+        assertNotNull("Message is not received", message);
+
+        _connection.stop();
+
+        MessageProducer messageProducer = _consumerSession.createProducer(_destination);
+        messageProducer.send(_consumerSession.createTextMessage("Test"));
+
+        getBrokerAdmin().restart();
+
+        try
+        {
+            _consumerSession.commit();
+            fail("Exception is expected");
+        }
+        catch (JMSException e)
+        {
+            // pass
+        }
+    }
+
     private void doFailoverWhilstPublishingInFlight() throws Exception
     {
         init(Session.SESSION_TRANSACTED, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message