qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject qpid-jms-amqp-0-x git commit: QPID-8222: [JMS AMQP 0-x][AMQP 0-8..0-91] Fix cleaning of dispatcher queue from failover thread
Date Fri, 10 Aug 2018 16:54:34 GMT
Repository: qpid-jms-amqp-0-x
Updated Branches:
  refs/heads/6.3.x a760e3231 -> 7644fff9e


QPID-8222: [JMS AMQP 0-x][AMQP 0-8..0-91] Fix cleaning of dispatcher queue from failover thread

(cherry picked from commit 2f020453ea0df0ff74dd51687beeaae93392d0bf)


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/7644fff9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/7644fff9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/7644fff9

Branch: refs/heads/6.3.x
Commit: 7644fff9ed3ce43d800d5af5c679b7d79127f98a
Parents: a760e32
Author: Alex Rudyy <orudyy@apache.org>
Authored: Thu Aug 9 18:55:20 2018 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Fri Aug 10 17:52:33 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/client/AMQSession.java     | 16 ++++++++--------
 .../java/org/apache/qpid/client/AMQSession_0_8.java | 11 ++++++++++-
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/7644fff9/client/src/main/java/org/apache/qpid/client/AMQSession.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ffb01d8..ffcbcac 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2417,17 +2417,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer,
P extends Basic
         // return the first <total number of msgs received on session>
         // messages sent by the brokers following the first rollback
         // after failover
-        _highestDeliveryTag.set(-1);
+        resetRollbackMarkers();
 
         _unacknowledgedMessageTags.clear();
         _prefetchedMessageTags.clear();
 
-        _rollbackMark.set(-1);
         clearResolvedDestinations();
         resubscribeProducers();
         resubscribeConsumers();
     }
 
+    void resetRollbackMarkers()
+    {
+        _highestDeliveryTag.set(-1);
+        _rollbackMark.set(-1);
+    }
+
     void setHasMessageListeners()
     {
         _hasMessageListeners = true;
@@ -3594,7 +3599,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer,
P extends Basic
 
                 try
                 {
-                    while (connectionStopped())
+                    while (!getAMQConnection().isFailingOver() && connectionStopped())
                     {
                         _lock.wait();
                     }
@@ -3870,11 +3875,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer,
P extends Basic
         }
     }
 
-    protected void clearDispatchQueue()
-    {
-        _queue.clear();
-    }
-
     private void shutdownFlowControlNoAckTaskPool()
     {
         if (_flowControlNoAckTaskPool != null)

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/7644fff9/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 6c7738a..a689840 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -176,7 +176,16 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8,
BasicMe
     @Override
     void resubscribe() throws QpidException
     {
-        clearDispatchQueue();
+        try
+        {
+            setUsingDispatcherForCleanup(true);
+            resetRollbackMarkers();
+            syncDispatchQueue(true);
+        }
+        finally
+        {
+            setUsingDispatcherForCleanup(false);
+        }
 
         getDeliveredMessageTags().clear();
         super.resubscribe();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message