qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [1/2] qpid-jms git commit: QPIDJMS-110: ensure outstanding sync send requests are updated when the connection fails [and failover isnt in use]
Date Thu, 17 Sep 2015 11:46:44 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 49aad78c8 -> d142e87a8


QPIDJMS-110: ensure outstanding sync send requests are updated when the connection fails [and
failover isnt in use]


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/172c9102
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/172c9102
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/172c9102

Branch: refs/heads/master
Commit: 172c9102ffb94a0d246fcdf967e72a00ce40dc09
Parents: 49aad78
Author: Robert Gemmell <robbie@apache.org>
Authored: Thu Sep 17 12:26:27 2015 +0100
Committer: Robert Gemmell <robbie@apache.org>
Committed: Thu Sep 17 12:26:27 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java | 35 ++++++++++++--
 .../integration/ProducerIntegrationTest.java    | 48 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/172c9102/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 2f9ecfd..374bcc3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -64,6 +65,7 @@ import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
@@ -107,6 +109,8 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
     private final AtomicLong tempDestIdGenerator = new AtomicLong();
     private final AtomicLong transactionIdGenerator = new AtomicLong();
 
+    private ConcurrentMap<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<AsyncResult,
AsyncResult>();
+
     protected JmsConnection(final String connectionId, Provider provider, IdGenerator clientIdGenerator)
throws JMSException {
 
         // This executor can be used for dispatching asynchronous tasks that might block
or result
@@ -668,8 +672,13 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
         //        this level.
         try {
             ProviderFuture request = new ProviderFuture();
-            provider.send(envelope, request);
-            request.sync();
+            requests.put(request, request);
+            try {
+                provider.send(envelope, request);
+                request.sync();
+            } finally {
+                requests.remove(request);
+            }
         } catch (Exception ioe) {
             throw JmsExceptionSupport.create(ioe);
         }
@@ -1100,12 +1109,23 @@ public class JmsConnection implements Connection, TopicConnection,
QueueConnecti
 
     @Override
     public void onConnectionFailure(final IOException ex) {
+        providerFailed(ex);
+
         onProviderException(ex);
+
+        for(AsyncResult request : requests.keySet())
+        {
+            try {
+                request.onFailure(ex);
+            } catch (Exception e) {
+                LOG.debug("Exception during request cleanup", e);
+            }
+        }
+
         if (!closing.get() && !closed.get()) {
             executor.execute(new Runnable() {
                 @Override
                 public void run() {
-                    providerFailed(ex);
                     if (provider != null) {
                         try {
                             provider.close();
@@ -1114,6 +1134,15 @@ public class JmsConnection implements Connection, TopicConnection,
QueueConnecti
                         }
                     }
 
+                    for(AsyncResult request : requests.keySet())
+                    {
+                        try {
+                            request.onFailure(ex);
+                        } catch (Exception e) {
+                            LOG.debug("Exception during request cleanup", e);
+                        }
+                    }
+
                     try {
                         shutdown(ex);
                     } catch (JMSException e) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/172c9102/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 6cd71c0..2c003b5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -617,6 +617,54 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
                 fail("Expected exception to be thrown");
             } catch (JMSException jmse) {
                 // Expected
+                assertNotNull("Expected exception to have a message", jmse.getMessage());
+                assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
+            }
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testRemotelyCloseConnectionDuringSyncSend() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Expect producer creation, give it credit.
+            testPeer.expectSenderAttach();
+
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+
+            // Expect a message to be sent, but don't send a disposition in
+            // response, simply remotely close the connection instead.
+            testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
+            testPeer.remotelyCloseConnection(false, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            Message message = session.createTextMessage(text);
+
+            try {
+                producer.send(message);
+                fail("Expected exception to be thrown");
+            } catch (JMSException jmse) {
+                // Expected
+                assertNotNull("Expected exception to have a message", jmse.getMessage());
+                assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
             }
 
             testPeer.waitForAllHandlersToComplete(3000);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message