qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [3/3] qpid-jms git commit: add test for and then fix calling session.recover() in onMessage() when using a DupsOk session
Date Mon, 19 Jan 2015 17:06:53 GMT
add test for and then fix calling session.recover() in onMessage() when using a DupsOk session


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/13fad9b3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/13fad9b3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/13fad9b3

Branch: refs/heads/master
Commit: 13fad9b325e750a2bbdbc45a7869ca33572602ce
Parents: 49b1fd6
Author: Robert Gemmell <robbie@apache.org>
Authored: Mon Jan 19 14:26:25 2015 +0000
Committer: Robert Gemmell <robbie@apache.org>
Committed: Mon Jan 19 14:26:25 2015 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |   6 +-
 .../apache/qpid/jms/consumer/JmsDupsOkTest.java | 164 +++++++++++++++++++
 2 files changed, 168 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/13fad9b3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index fcad8db..eac2e11 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -554,7 +554,9 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
             while (session.isStarted() && (envelope = messageQueue.dequeueNoWait())
!= null) {
                 try {
                     JmsMessage copy = null;
-                    if (acknowledgementMode == Session.AUTO_ACKNOWLEDGE) {
+                    boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE
||
+                                                acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
+                    if (autoAckOrDupsOk) {
                         copy = copy(doAckDelivered(envelope));
                     } else {
                         copy = copy(ackFromReceive(envelope));
@@ -563,7 +565,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
 
                     messageListener.onMessage(copy);
 
-                    if (acknowledgementMode == Session.AUTO_ACKNOWLEDGE && !session.isSessionRecovered())
{
+                    if (autoAckOrDupsOk && !session.isSessionRecovered()) {
                         doAckConsumed(envelope);
                     }
                 } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/13fad9b3/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDupsOkTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDupsOkTest.java
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDupsOkTest.java
new file mode 100644
index 0000000..6c38a20
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDupsOkTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.consumer;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsDupsOkTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsDupsOkTest.class);
+
+    /**
+     * Test use of session recovery while using an dups-ok session and
+     * a message listener. Calling recover should result in delivery of the
+     * current message again, followed by those that would have been received
+     * afterwards.
+     *
+     * Send three messages. Consume the first message, then recover on the second
+     * message and expect to see it again, ensure the third message is not seen
+     * until after this.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testRecoverInOnMessage() throws Exception {
+        connection = createAmqpConnection();
+
+        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        sendMessages(connection, queue, 3);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        DupsOkRecoverMsgListener listener = new DupsOkRecoverMsgListener(latch, session);
+        consumer.setMessageListener(listener);
+
+        connection.start();
+
+        assertTrue("Timed out waiting for async listener", latch.await(10, TimeUnit.SECONDS));
+        assertFalse("Test failed in listener, consult logs", listener.getFailed());
+    }
+
+    private static class DupsOkRecoverMsgListener implements MessageListener {
+        final Session session;
+        final CountDownLatch latch;
+        private boolean seenFirstMessage = false;
+        private boolean seenSecondMessage = false;
+        private boolean seenSecondMessageTwice = false;
+        private boolean complete = false;
+        private boolean failed = false;
+
+        public DupsOkRecoverMsgListener(CountDownLatch latch, Session session) {
+            this.latch = latch;
+            this.session = session;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                int msgNumProperty = message.getIntProperty(MESSAGE_NUMBER);
+
+                if(complete ){
+                    LOG.info("Test already finished, ignoring delivered message: " + msgNumProperty);
+                    return;
+                }
+
+                if (msgNumProperty == 1) {
+                    if (!seenFirstMessage) {
+                        LOG.info("Received first message.");
+                        seenFirstMessage = true;
+                    } else {
+                        LOG.error("Received first message again.");
+                        complete(true);
+                    }
+                } else if (msgNumProperty == 2) {
+                    if(!seenSecondMessage){
+                        seenSecondMessage = true;
+                        LOG.info("Received second message. Now calling recover()");
+                        session.recover();
+                    } else {
+                        LOG.info("Received second message again as expected.");
+                        seenSecondMessageTwice = true;
+                        if(message.getJMSRedelivered()) {
+                            LOG.info("Message was marked redelivered as expected.");
+                        } else {
+                            LOG.error("Message was not marked redelivered.");
+                            complete(true);
+                        }
+                    }
+                } else {
+                    if (msgNumProperty != 3) {
+                        LOG.error("Received unexpected message: " + msgNumProperty);
+                        complete(true);
+                        return;
+                    }
+
+                    if (!(seenFirstMessage && seenSecondMessageTwice)) {
+                        LOG.error("Third message was not received in expected sequence.");
+                        complete(true);
+                        return;
+                    }
+
+                    LOG.info("Received third message.");
+
+                    if(message.getJMSRedelivered()) {
+                        LOG.error("Message was marked redelivered against expectation.");
+                        complete(true);
+                    } else {
+                        LOG.info("Message was not marked redelivered, as expected.");
+                        complete(false);
+                    }
+                }
+            } catch (JMSException e) {
+                LOG.error("Exception caught in listener", e);
+                complete(true);
+            }
+        }
+
+        public boolean getFailed() {
+            return failed;
+        }
+
+        private void complete(boolean fail) {
+            failed = fail;
+            complete = true;
+            latch.countDown();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message