qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/7] qpid-jms git commit: QPIDJMS-207 Adds support for Asynchronous JMS 2.0 sends.
Date Mon, 12 Sep 2016 20:12:02 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 6553cfd5b -> 6e442f4c6


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 8eaf707..d6dc443 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -43,11 +43,14 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.qpid.jms.JmsCompletionListener;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
@@ -56,6 +59,7 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
@@ -1034,6 +1038,153 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testFailoverPassthroughOfCompletedAsyncSend() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final Connection connection = establishAnonymousConnecton(
+                "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
+
+            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectBegin();
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFalioverPassthroughOfRejectedAsyncCompletionSend() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final JmsConnection connection = establishAnonymousConnecton(
+                "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
+
+            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectBegin();
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            Message message = session.createTextMessage("content");
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false,
new Rejected(), true);
+
+            assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFailoverConnectionLossFailsWaitingAsyncCompletionSends() throws Exception
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final JmsConnection connection = establishAnonymousConnecton(
+                "failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+                testPeer);
+
+            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectBegin();
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+            final int MSG_COUNT = 5;
+
+            Message message = session.createTextMessage("content");
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            }
+
+            // Accept one which shouldn't complete until after the others have failed.
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false,
new Accepted(), true);
+            testPeer.dropAfterLastHandler();
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT
+ 1);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    producer.send(message, listener);
+                }
+
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount);
+            assertEquals(1, listener.successCount);
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException
{
         return establishAnonymousConnecton(null, null, peers);
     }
@@ -1076,4 +1227,47 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
     private String createPeerURI(TestAmqpPeer peer, String params) {
         return "amqp://localhost:" + peer.getServerPort() + (params != null ? "?" + params
: "");
     }
+
+    private class TestJmsCompletionListener implements JmsCompletionListener {
+
+        private final CountDownLatch completed;
+
+        public volatile int successCount;
+        public volatile int errorCount;
+
+        public volatile Message message;
+        public volatile Exception exception;
+
+        public TestJmsCompletionListener() {
+            this(1);
+        }
+
+        public TestJmsCompletionListener(int expected) {
+            this.completed = new CountDownLatch(expected);
+        }
+
+        public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException
{
+            return completed.await(timeout, units);
+        }
+
+        @Override
+        public void onCompletion(Message message) {
+            LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
+            this.message = message;
+            this.successCount++;
+
+            completed.countDown();
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+            LOG.info("JmsCompletionListener onException called with message: {} error {}",
message, exception);
+
+            this.message = message;
+            this.exception = exception;
+            this.errorCount++;
+
+            completed.countDown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index 210ce2c..35d305c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -274,7 +274,17 @@ public class MockProvider implements Provider {
                 try {
                     checkClosed();
                     stats.recordSendCall();
+
                     request.onSuccess();
+                    if (envelope.isCompletionRequired()) {
+                        if (configuration.isDelayCompletionCalls()) {
+                            context.recordPendingCompletion(MockProvider.this, envelope);
+                        } else {
+                            if (listener != null) {
+                                listener.onCompletedMessageSend(envelope);
+                            }
+                        }
+                    }
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -422,7 +432,6 @@ public class MockProvider implements Provider {
         });
     }
 
-
     /**
      * Switch state to closed without sending any notifications
      */
@@ -489,7 +498,6 @@ public class MockProvider implements Provider {
 
     //----- Implementation details -------------------------------------------//
 
-
     private void checkClosed() throws ProviderClosedException {
         if (closed.get()) {
             throw new ProviderClosedException("This Provider is already closed");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
index 7c78fff..d8c9019 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
@@ -25,6 +25,8 @@ public class MockProviderConfiguration {
     private boolean failOnStart;
     private boolean failOnClose;
 
+    private boolean delayCompletionCalls;
+
     public boolean isFailOnConnect() {
         return failOnConnect;
     }
@@ -48,4 +50,12 @@ public class MockProviderConfiguration {
     public void setFailOnClose(boolean value) {
         this.failOnClose = value;
     }
+
+    public boolean isDelayCompletionCalls() {
+        return delayCompletionCalls;
+    }
+
+    public void setDelayCompletionCalls(boolean delayCompletionCalls) {
+        this.delayCompletionCalls = delayCompletionCalls;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
index 99fbfbc..a964282 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
@@ -18,10 +18,17 @@ package org.apache.qpid.jms.provider.mock;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsResource;
 
 /**
@@ -42,6 +49,9 @@ public class MockRemotePeer {
     private ResourceLifecycleFilter stopFilter;
     private ResourceLifecycleFilter destroyFilter;
 
+    private final Map<Destination, List<PendingCompletion>> pendingCompletions
=
+        new ConcurrentHashMap<Destination, List<PendingCompletion>>();
+
     public void connect(MockProvider provider) throws IOException {
         if (offline) {
             throw new IOException();
@@ -146,4 +156,117 @@ public class MockRemotePeer {
     public void setResourceDestroyFilter(ResourceLifecycleFilter filter) {
         destroyFilter = filter;
     }
+
+    //----- Controls handling of Message Send Completions --------------------//
+
+    public void recordPendingCompletion(MockProvider provider, JmsOutboundMessageDispatch
envelope) {
+        Destination destination = envelope.getDestination();
+        if (!pendingCompletions.containsKey(destination)) {
+            pendingCompletions.put(destination, new ArrayList<PendingCompletion>());
+        }
+
+        pendingCompletions.get(destination).add(new PendingCompletion(provider, envelope));
+    }
+
+    public void completeAllPendingSends(Destination destination) {
+        if (pendingCompletions.containsKey(destination)) {
+
+            for (List<PendingCompletion> pendingSends : pendingCompletions.values())
{
+                for (PendingCompletion pending : pendingSends) {
+                    pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope);
+                }
+            }
+
+            pendingCompletions.remove(destination);
+        }
+    }
+
+    public void failAllPendingSends(Destination destination, Exception error) {
+        if (pendingCompletions.containsKey(destination)) {
+
+            for (List<PendingCompletion> pendingSends : pendingCompletions.values())
{
+                for (PendingCompletion pending : pendingSends) {
+                    pending.provider.getProviderListener().onFailedMessageSend(pending.envelope,
error);
+                }
+            }
+
+            pendingCompletions.remove(destination);
+        }
+    }
+
+    public void completePendingSend(Message message) throws JMSException {
+        List<PendingCompletion> pendingSends = pendingCompletions.get(message.getJMSDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if (pending.envelope.getMessage().getJMSMessageID().equals(message.getJMSMessageID()))
{
+                pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope);
+                iterator.remove();
+            }
+        }
+    }
+
+    public void completePendingSend(JmsOutboundMessageDispatch envelope) throws JMSException
{
+        List<PendingCompletion> pendingSends = pendingCompletions.get(envelope.getDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if (pending.envelope.getMessage().getJMSMessageID().equals(envelope.getMessage().getJMSMessageID()))
{
+                pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope);
+                iterator.remove();
+            }
+        }
+    }
+
+    public void failPendingSend(Message message, Exception error) throws JMSException {
+        List<PendingCompletion> pendingSends = pendingCompletions.get(message.getJMSDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if (pending.envelope.getMessage().getJMSMessageID().equals(message.getJMSMessageID()))
{
+                pending.provider.getProviderListener().onFailedMessageSend(pending.envelope,
error);
+                iterator.remove();
+            }
+        }
+    }
+
+    public void failPendingSend(JmsOutboundMessageDispatch envelope, Exception error) throws
JMSException {
+        List<PendingCompletion> pendingSends = pendingCompletions.get(envelope.getDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if (pending.envelope.getMessage().getJMSMessageID().equals(envelope.getMessage().getJMSMessageID()))
{
+                pending.provider.getProviderListener().onFailedMessageSend(pending.envelope,
error);
+                iterator.remove();
+            }
+        }
+    }
+
+    public List<JmsOutboundMessageDispatch> getPendingCompletions(Destination destination)
{
+        List<JmsOutboundMessageDispatch> result = null;
+
+        if (pendingCompletions.containsKey(destination)) {
+            result = new ArrayList<JmsOutboundMessageDispatch>();
+            List<PendingCompletion> pendingMessages = pendingCompletions.get(destination);
+            for (PendingCompletion pending : pendingMessages) {
+                result.add(pending.envelope);
+            }
+        } else {
+            result = Collections.emptyList();
+        }
+
+        return result;
+    }
+
+    private class PendingCompletion {
+
+        public final MockProvider provider;
+        public final JmsOutboundMessageDispatch envelope;
+
+        public PendingCompletion(MockProvider provider, JmsOutboundMessageDispatch envelope)
{
+            this.provider = provider;
+            this.envelope = envelope;
+        }
+
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message