Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4E58A200B8C for ; Mon, 12 Sep 2016 22:12:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 46EF2160ADA; Mon, 12 Sep 2016 20:12:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19AC7160AB2 for ; Mon, 12 Sep 2016 22:12:03 +0200 (CEST) Received: (qmail 16190 invoked by uid 500); 12 Sep 2016 20:12:03 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 16171 invoked by uid 99); 12 Sep 2016 20:12:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Sep 2016 20:12:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE71EE01BA; Mon, 12 Sep 2016 20:12:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@qpid.apache.org Date: Mon, 12 Sep 2016 20:12:02 -0000 Message-Id: <756862ac68dc40ff8a73f9a20eb631d2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/7] qpid-jms git commit: QPIDJMS-207 Adds support for Asynchronous JMS 2.0 sends. archived-at: Mon, 12 Sep 2016 20:12:05 -0000 Repository: qpid-jms Updated Branches: refs/heads/master 6553cfd5b -> 6e442f4c6 http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 8eaf707..d6dc443 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -43,11 +43,14 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.qpid.jms.JmsCompletionListener; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.JmsMessageProducer; import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsSendTimedOutException; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; @@ -56,6 +59,7 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; +import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; @@ -1034,6 +1038,153 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 20000) + public void testFailoverPassthroughOfCompletedAsyncSend() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final Connection connection = establishAnonymousConnecton( + "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer); + + testPeer.expectSaslAnonymousConnect(); + testPeer.expectBegin(); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create and transfer a new message + String text = "myMessage"; + testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); + + TextMessage message = session.createTextMessage(text); + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + + producer.send(message, listener); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testFalioverPassthroughOfRejectedAsyncCompletionSend() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final JmsConnection connection = establishAnonymousConnecton( + "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer); + + testPeer.expectSaslAnonymousConnect(); + testPeer.expectBegin(); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + Message message = session.createTextMessage("content"); + testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, new Rejected(), true); + + assertNull("Should not yet have a JMSDestination", message.getJMSDestination()); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + try { + producer.send(message, listener); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNotNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); + + listener = new TestJmsCompletionListener(); + try { + producer.send(message, listener); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + @Test(timeout = 20000) + public void testFailoverConnectionLossFailsWaitingAsyncCompletionSends() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final JmsConnection connection = establishAnonymousConnecton( + "failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", + testPeer); + + testPeer.expectSaslAnonymousConnect(); + testPeer.expectBegin(); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + final int MSG_COUNT = 5; + + Message message = session.createTextMessage("content"); + for (int i = 0; i < MSG_COUNT; ++i) { + testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher()); + } + + // Accept one which shouldn't complete until after the others have failed. + testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, new Accepted(), true); + testPeer.dropAfterLastHandler(); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT + 1); + try { + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(message, listener); + } + + producer.send(message, listener); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertEquals(MSG_COUNT, listener.errorCount); + assertEquals(1, listener.successCount); + assertNotNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + connection.close(); + } + } + private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException { return establishAnonymousConnecton(null, null, peers); } @@ -1076,4 +1227,47 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { private String createPeerURI(TestAmqpPeer peer, String params) { return "amqp://localhost:" + peer.getServerPort() + (params != null ? "?" + params : ""); } + + private class TestJmsCompletionListener implements JmsCompletionListener { + + private final CountDownLatch completed; + + public volatile int successCount; + public volatile int errorCount; + + public volatile Message message; + public volatile Exception exception; + + public TestJmsCompletionListener() { + this(1); + } + + public TestJmsCompletionListener(int expected) { + this.completed = new CountDownLatch(expected); + } + + public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException { + return completed.await(timeout, units); + } + + @Override + public void onCompletion(Message message) { + LOG.info("JmsCompletionListener onCompletion called with message: {}", message); + this.message = message; + this.successCount++; + + completed.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception); + + this.message = message; + this.exception = exception; + this.errorCount++; + + completed.countDown(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java index 210ce2c..35d305c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java @@ -274,7 +274,17 @@ public class MockProvider implements Provider { try { checkClosed(); stats.recordSendCall(); + request.onSuccess(); + if (envelope.isCompletionRequired()) { + if (configuration.isDelayCompletionCalls()) { + context.recordPendingCompletion(MockProvider.this, envelope); + } else { + if (listener != null) { + listener.onCompletedMessageSend(envelope); + } + } + } } catch (Exception error) { request.onFailure(error); } @@ -422,7 +432,6 @@ public class MockProvider implements Provider { }); } - /** * Switch state to closed without sending any notifications */ @@ -489,7 +498,6 @@ public class MockProvider implements Provider { //----- Implementation details -------------------------------------------// - private void checkClosed() throws ProviderClosedException { if (closed.get()) { throw new ProviderClosedException("This Provider is already closed"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java index 7c78fff..d8c9019 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java @@ -25,6 +25,8 @@ public class MockProviderConfiguration { private boolean failOnStart; private boolean failOnClose; + private boolean delayCompletionCalls; + public boolean isFailOnConnect() { return failOnConnect; } @@ -48,4 +50,12 @@ public class MockProviderConfiguration { public void setFailOnClose(boolean value) { this.failOnClose = value; } + + public boolean isDelayCompletionCalls() { + return delayCompletionCalls; + } + + public void setDelayCompletionCalls(boolean delayCompletionCalls) { + this.delayCompletionCalls = delayCompletionCalls; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java index 99fbfbc..a964282 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java @@ -18,10 +18,17 @@ package org.apache.qpid.jms.provider.mock; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsResource; /** @@ -42,6 +49,9 @@ public class MockRemotePeer { private ResourceLifecycleFilter stopFilter; private ResourceLifecycleFilter destroyFilter; + private final Map> pendingCompletions = + new ConcurrentHashMap>(); + public void connect(MockProvider provider) throws IOException { if (offline) { throw new IOException(); @@ -146,4 +156,117 @@ public class MockRemotePeer { public void setResourceDestroyFilter(ResourceLifecycleFilter filter) { destroyFilter = filter; } + + //----- Controls handling of Message Send Completions --------------------// + + public void recordPendingCompletion(MockProvider provider, JmsOutboundMessageDispatch envelope) { + Destination destination = envelope.getDestination(); + if (!pendingCompletions.containsKey(destination)) { + pendingCompletions.put(destination, new ArrayList()); + } + + pendingCompletions.get(destination).add(new PendingCompletion(provider, envelope)); + } + + public void completeAllPendingSends(Destination destination) { + if (pendingCompletions.containsKey(destination)) { + + for (List pendingSends : pendingCompletions.values()) { + for (PendingCompletion pending : pendingSends) { + pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope); + } + } + + pendingCompletions.remove(destination); + } + } + + public void failAllPendingSends(Destination destination, Exception error) { + if (pendingCompletions.containsKey(destination)) { + + for (List pendingSends : pendingCompletions.values()) { + for (PendingCompletion pending : pendingSends) { + pending.provider.getProviderListener().onFailedMessageSend(pending.envelope, error); + } + } + + pendingCompletions.remove(destination); + } + } + + public void completePendingSend(Message message) throws JMSException { + List pendingSends = pendingCompletions.get(message.getJMSDestination()); + Iterator iterator = pendingSends.iterator(); + while (iterator.hasNext()) { + PendingCompletion pending = iterator.next(); + if (pending.envelope.getMessage().getJMSMessageID().equals(message.getJMSMessageID())) { + pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope); + iterator.remove(); + } + } + } + + public void completePendingSend(JmsOutboundMessageDispatch envelope) throws JMSException { + List pendingSends = pendingCompletions.get(envelope.getDestination()); + Iterator iterator = pendingSends.iterator(); + while (iterator.hasNext()) { + PendingCompletion pending = iterator.next(); + if (pending.envelope.getMessage().getJMSMessageID().equals(envelope.getMessage().getJMSMessageID())) { + pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope); + iterator.remove(); + } + } + } + + public void failPendingSend(Message message, Exception error) throws JMSException { + List pendingSends = pendingCompletions.get(message.getJMSDestination()); + Iterator iterator = pendingSends.iterator(); + while (iterator.hasNext()) { + PendingCompletion pending = iterator.next(); + if (pending.envelope.getMessage().getJMSMessageID().equals(message.getJMSMessageID())) { + pending.provider.getProviderListener().onFailedMessageSend(pending.envelope, error); + iterator.remove(); + } + } + } + + public void failPendingSend(JmsOutboundMessageDispatch envelope, Exception error) throws JMSException { + List pendingSends = pendingCompletions.get(envelope.getDestination()); + Iterator iterator = pendingSends.iterator(); + while (iterator.hasNext()) { + PendingCompletion pending = iterator.next(); + if (pending.envelope.getMessage().getJMSMessageID().equals(envelope.getMessage().getJMSMessageID())) { + pending.provider.getProviderListener().onFailedMessageSend(pending.envelope, error); + iterator.remove(); + } + } + } + + public List getPendingCompletions(Destination destination) { + List result = null; + + if (pendingCompletions.containsKey(destination)) { + result = new ArrayList(); + List pendingMessages = pendingCompletions.get(destination); + for (PendingCompletion pending : pendingMessages) { + result.add(pending.envelope); + } + } else { + result = Collections.emptyList(); + } + + return result; + } + + private class PendingCompletion { + + public final MockProvider provider; + public final JmsOutboundMessageDispatch envelope; + + public PendingCompletion(MockProvider provider, JmsOutboundMessageDispatch envelope) { + this.provider = provider; + this.envelope = envelope; + } + + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org