activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6336
Date Fri, 15 Jul 2016 20:45:47 GMT
Repository: activemq
Updated Branches:
  refs/heads/master cd5ea6c27 -> a3a5a1aff


https://issues.apache.org/jira/browse/AMQ-6336

Ensure that when expired messages are handled client side that pull
consumers get a chance to see it and send a new pull request complete an
outstanding timed pull.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a3a5a1af
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a3a5a1af
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a3a5a1af

Branch: refs/heads/master
Commit: a3a5a1affa7aa229202efc51625531ccce1414bb
Parents: cd5ea6c
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Jul 15 16:45:35 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Jul 15 16:45:35 2016 -0400

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |  30 ++-
 .../bugs/MessageExpiryClientSideTest.java       | 265 +++++++++++++++++++
 2 files changed, 284 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a3a5a1af/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index a743a8d..83ce137 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -525,11 +525,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
     }
 
     private boolean consumeExpiredMessage(MessageDispatch dispatch) {
-        if (dispatch.getMessage().isExpired()) {
-            return !isBrowser() && isConsumerExpiryCheckEnabled();
-        }
-
-        return false;
+        return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
     }
 
     private void posionAck(MessageDispatch md, String cause) throws JMSException {
@@ -1421,14 +1417,26 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                                 // delayed redelivery, ensure it can be re delivered
                                 session.connection.rollbackDuplicate(this, md.getMessage());
                             }
-                            if (!(md.getMessage() != null && md.getMessage().isExpired()))
{
+
+                            if (md.getMessage() == null) {
+                                // End of browse or pull request timeout.
                                 unconsumedMessages.enqueue(md);
-                                if (availableListener != null) {
-                                    availableListener.onMessageAvailable(this);
-                                }
                             } else {
-                                beforeMessageIsConsumed(md);
-                                afterMessageIsConsumed(md, false);
+                                if (!consumeExpiredMessage(md)) {
+                                    unconsumedMessages.enqueue(md);
+                                    if (availableListener != null) {
+                                        availableListener.onMessageAvailable(this);
+                                    }
+                                } else {
+                                    beforeMessageIsConsumed(md);
+                                    afterMessageIsConsumed(md, true);
+
+                                    // Pull consumer needs to check if pull timed out and
send
+                                    // a new pull command if not.
+                                    if (info.getCurrentPrefetchSize() == 0) {
+                                        unconsumedMessages.enqueue(null);
+                                    }
+                                }
                             }
                         }
                     } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a3a5a1af/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpiryClientSideTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpiryClientSideTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpiryClientSideTest.java
new file mode 100644
index 0000000..290656a
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpiryClientSideTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNull;
+
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.MessageDispatch;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageExpiryClientSideTest {
+
+    private ActiveMQConnection connection;
+    private BrokerService broker;
+    private volatile Exception connectionError;
+
+    @Before
+    public void setUp() throws Exception {
+        createBroker();
+
+        connection = createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                try {
+                    connectionError = exception;
+                    connection.close();
+                } catch (JMSException e) {
+                }
+            }
+        });
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {
+            }
+        }
+
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    /**
+     * check if the pull request (prefetch=1) times out when the expiry occurs
+     * on the client side.
+     */
+    @Test(timeout = 30000)
+    public void testConsumerReceivePrefetchOneRedeliveryZero() throws Exception {
+
+        connection.getPrefetchPolicy().setQueuePrefetch(1);
+        connection.start();
+
+        // push message to queue
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("timeout.test");
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage textMessage = session.createTextMessage("test Message");
+
+        producer.send(textMessage);
+        session.close();
+
+        // try to consume message
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        Message message = consumer.receive(1000);
+
+        // message should be null as it should have expired and the
+        // consumer.receive(timeout) should return null.
+        assertNull(message);
+        session.close();
+
+        assertNull(connectionError);
+    }
+
+    /**
+     * check if the pull request (prefetch=0) times out when the expiry occurs
+     * on the client side.
+     */
+    @Test(timeout = 30000)
+    public void testConsumerReceivePrefetchZeroRedeliveryZero() throws Exception {
+
+        connection.getPrefetchPolicy().setQueuePrefetch(0);
+        connection.start();
+
+        // push message to queue
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("timeout.test");
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage textMessage = session.createTextMessage("test Message");
+
+        producer.send(textMessage);
+        session.close();
+
+        // try to consume message
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        Message message = consumer.receive(1000);
+
+        // message should be null as it should have expired and the
+        // consumer.receive(timeout) should return null.
+        assertNull(message);
+        session.close();
+
+        assertNull(connectionError);
+    }
+
+    /**
+     * check if the pull request (prefetch=0) times out when the expiry occurs
+     * on the client side.
+     */
+    @Test(timeout = 30000)
+    public void testQueueBrowserPrefetchZeroRedeliveryZero() throws Exception {
+
+        connection.getPrefetchPolicy().setQueuePrefetch(0);
+        connection.start();
+
+        // push message to queue
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("timeout.test");
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage textMessage = session.createTextMessage("test Message");
+
+        producer.send(textMessage);
+        session.close();
+
+        // try to consume message
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueBrowser browser = session.createBrowser(queue);
+
+        Message message = null;
+        Enumeration<?> enumeration = browser.getEnumeration();
+        while (enumeration.hasMoreElements()) {
+            message = (Message) enumeration.nextElement();
+        }
+
+        // message should be null as it should have expired and the
+        // consumer.receive(timeout) should return null.
+        assertNull(message);
+        session.close();
+
+        assertNull(connectionError);
+    }
+
+    /**
+     * check if the browse with (prefetch=1) times out when the expiry occurs
+     * on the client side.
+     */
+    @Test(timeout = 30000)
+    public void testQueueBrowserPrefetchOneRedeliveryZero() throws Exception {
+
+        connection.getPrefetchPolicy().setQueuePrefetch(1);
+        connection.start();
+
+        // push message to queue
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("timeout.test");
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage textMessage = session.createTextMessage("test Message");
+
+        producer.send(textMessage);
+        session.close();
+
+        // try to consume message
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueBrowser browser = session.createBrowser(queue);
+
+        Message message = null;
+        Enumeration<?> enumeration = browser.getEnumeration();
+        while (enumeration.hasMoreElements()) {
+            message = (Message) enumeration.nextElement();
+        }
+
+        // message should be null as it should have expired and the
+        // consumer.receive(timeout) should return null.
+        assertNull(message);
+        session.close();
+
+        assertNull(connectionError);
+    }
+
+    private void createBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(false);
+        broker.setAdvisorySupport(false);
+
+        // add a plugin to ensure the expiration happens on the client side rather
+        // than broker side.
+        broker.setPlugins(new BrokerPlugin[] { new BrokerPlugin() {
+
+            @Override
+            public Broker installPlugin(Broker broker) throws Exception {
+                return new BrokerFilter(broker) {
+
+                    private AtomicInteger counter = new AtomicInteger();
+
+                    @Override
+                    public void preProcessDispatch(MessageDispatch messageDispatch) {
+                        if (counter.get() == 0 && messageDispatch.getDestination().getPhysicalName().contains("timeout.test"))
{
+                            // Set the expiration to now
+                            messageDispatch.getMessage().setExpiration(System.currentTimeMillis()
- 1000);
+                            counter.incrementAndGet();
+                        }
+
+                        super.preProcessDispatch(messageDispatch);
+                    }
+                };
+            }
+        } });
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    protected ActiveMQConnection createConnection() throws Exception {
+        return (ActiveMQConnection) createConnectionFactory().createConnection();
+    }
+}


Mime
View raw message