qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: QPIDJMS-403 Trigger whenOffline event on queued requests on drop
Date Tue, 17 Jul 2018 21:17:40 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master a21542cca -> 3244adc3a


QPIDJMS-403 Trigger whenOffline event on queued requests on drop

When the handle connection failure method runs it should trigger the
when off line behavior of any currently queued tasks in order to allow
timely completion of requests that do not need to wait for a new
connection to happen such as session close, message acknowledge etc.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3244adc3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3244adc3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3244adc3

Branch: refs/heads/master
Commit: 3244adc3adf14e90f0367df683f672a0b4e87d80
Parents: a21542c
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Jul 17 17:17:20 2018 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jul 17 17:17:20 2018 -0400

----------------------------------------------------------------------
 .../jms/provider/failover/FailoverProvider.java |  6 ++
 .../failover/FailoverIntegrationTest.java       | 64 ++++++++++++++++++++
 2 files changed, 70 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3244adc3/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 2f85ccd..b48f4c9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -554,6 +554,12 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
                 if (listener != null) {
                     listener.onConnectionInterrupted(failedURI);
                 }
+                
+                if (!requests.isEmpty()) {
+                	for (FailoverRequest request : requests.values()) {
+                		request.whenOffline(cause);
+                	}
+                }
 
                 // Start watching for request timeouts while we are offline, unless we already
are.
                 if (requestTimeoutTask == null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3244adc3/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index d9f5713..4e3128e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1141,6 +1141,70 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testFailoverHandlesDropAfterSessionCloseRequested() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer()) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+
+            final String originalURI = createPeerURI(originalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            originalPeer.expectBegin();
+            originalPeer.expectEnd(false);
+            originalPeer.dropAfterLastHandler();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            
+            final CountDownLatch sessionCloseCompleted = new CountDownLatch(1);
+            final AtomicBoolean sessionClosedThrew = new AtomicBoolean();
+            Thread sessionCloseThread = new Thread(new Runnable() {
+				
+				@Override
+				public void run() {
+		            try {
+		            	session.close();
+		            	LOG.debug("Close of session returned ok");
+		            } catch (JMSException jmsEx) {
+		            	LOG.warn("Should not throw on session close when connection drops.", jmsEx);
+		            	sessionClosedThrew.set(true);
+		            } finally {
+		            	sessionCloseCompleted.countDown();
+		            }
+				}
+			}, "Session close thread");
+
+            sessionCloseThread.start();
+
+            originalPeer.waitForAllHandlersToComplete(2000);
+
+            assertTrue("Session close should have completed by now", sessionCloseCompleted.await(3,
TimeUnit.SECONDS));
+            assertFalse("Session close should have completed normally", sessionClosedThrew.get());
+            
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testCreateConsumerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred()
throws Exception {
         doCreateConsumerFailsWhenLinkRefusedTestImpl(false);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message