activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5851 resolves #123
Date Thu, 09 Jul 2015 15:48:24 GMT
Repository: activemq
Updated Branches:
  refs/heads/master c9c32e5dc -> f10aab642


https://issues.apache.org/jira/browse/AMQ-5851
resolves #123

This commit resolves an issue where unmatched acknowledgement
messages could be received when running a MDB consumer and
sending messages with a short TTL.  The expiration logic when
receiveing an expired Message Ack will now only expire messages
in dispatch relating to the received ack, not all expired messages
in the dispatch list.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f10aab64
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f10aab64
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f10aab64

Branch: refs/heads/master
Commit: f10aab64282310dd97205f8fa1dafb39d44f6559
Parents: c9c32e5
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Thu Jun 25 18:22:32 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Thu Jul 9 16:46:21 2015 +0100

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |  54 ++++-
 .../activemq/ActiveMQMessageConsumer.java       |   3 +
 .../org/apache/activemq/ActiveMQSession.java    |   1 +
 .../activemq/ExpiredAckAsyncConsumerTest.java   | 240 +++++++++++++++++++
 4 files changed, 289 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f10aab64/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index d148e80..ebd8a7d 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -281,21 +281,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                         break;
                     }
                 }
-            }else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
+            }else if (ack.isDeliveredAck()) {
                 // Message was delivered but not acknowledged: update pre-fetch
                 // counters.
                 int index = 0;
                 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
index++) {
                     final MessageReference node = iter.next();
                     Destination nodeDest = (Destination) node.getRegionDestination();
-                    if (node.isExpired()) {
-                        if (broker.isExpired(node)) {
-                            Destination regionDestination = nodeDest;
-                            regionDestination.messageExpired(context, this, node);
-                        }
-                        iter.remove();
-                        nodeDest.getDestinationStatistics().getInflight().decrement();
-                    }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
                         if (usePrefetchExtension && getPrefetchSize() != 0) {
                             // allow  batch to exceed prefetch
@@ -317,6 +309,50 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                             "Could not correlate acknowledgment with dispatched message:
"
                                     + ack);
                 }
+            } else if (ack.isExpiredAck()) {
+                // Message was expired
+                int index = 0;
+                boolean inAckRange = false;
+                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
index++) {
+                    final MessageReference node = iter.next();
+                    Destination nodeDest = (Destination) node.getRegionDestination();
+                    MessageId messageId = node.getMessageId();
+                    if (ack.getFirstMessageId() == null
+                            || ack.getFirstMessageId().equals(messageId)) {
+                        inAckRange = true;
+                    }
+                    if (inAckRange) {
+                        if (node.isExpired()) {
+                            if (broker.isExpired(node)) {
+                                Destination regionDestination = nodeDest;
+                                regionDestination.messageExpired(context, this, node);
+                            }
+                            iter.remove();
+                            nodeDest.getDestinationStatistics().getInflight().decrement();
+                        }
+                        if (ack.getLastMessageId().equals(messageId)) {
+                            if (usePrefetchExtension && getPrefetchSize() != 0) {
+                                // allow  batch to exceed prefetch
+                                while (true) {
+                                    int currentExtension = prefetchExtension.get();
+                                    int newExtension = Math.max(currentExtension, index +
1);
+                                    if (prefetchExtension.compareAndSet(currentExtension,
newExtension)) {
+                                        break;
+                                    }
+                                }
+                            }
+
+                            destination = (Destination) node.getRegionDestination();
+                            callDispatchMatched = true;
+                            break;
+                        }
+                    }
+                }
+                if (!callDispatchMatched) {
+                    throw new JMSException(
+                            "Could not correlate expiration acknowledgment with dispatched
message: "
+                                    + ack);
+                }
             } else if (ack.isRedeliveredAck()) {
                 // Message was re-delivered but it was not yet considered to be
                 // a DLQ message.

http://git-wip-us.apache.org/repos/asf/activemq/blob/f10aab64/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 82a1bb4..c255f06 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -1174,6 +1174,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
 
     void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
         MessageAck ack = new MessageAck(md, ackType, 1);
+        if (ack.isExpiredAck()) {
+            ack.setFirstMessageId(ack.getLastMessageId());
+        }
         session.sendAck(ack);
         synchronized(deliveredMessages){
             deliveredMessages.remove(md);

http://git-wip-us.apache.org/repos/asf/activemq/blob/f10aab64/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 2daef8c..129ed51 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -890,6 +890,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
             MessageAck earlyAck = null;
             if (message.isExpired()) {
                 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
+                earlyAck.setFirstMessageId(message.getMessageId());
             } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
                 LOG.debug("{} got duplicate: {}", this, message.getMessageId());
                 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);

http://git-wip-us.apache.org/repos/asf/activemq/blob/f10aab64/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiredAckAsyncConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiredAckAsyncConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiredAckAsyncConsumerTest.java
new file mode 100644
index 0000000..c3aeef6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiredAckAsyncConsumerTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The purpose of this test is to show that AMQ-5851 is fixed.  When running in an application
+ * container, if multiple messages were consumed asynchronously and the messages had a short
TTL,
+ * it was possible to get an exception on the broker when a message acknowledgement was received.
+ * This is because the original expiration strategy was excessive and when an expired Ack
was received,
+ * all dispatched messages were checked for expiration instead of only the messages tied
to that Ack.
+ * This caused an issue because sometimes a thread would finish and send back a standard
ack,
+ * but another expired ack would have already cleared the message from the dispach list.
+ * Now only messages tied to the MessageAck are expired which fixes this problem.
+ *
+ */
+public class ExpiredAckAsyncConsumerTest {
+    private static final Logger LOG = LoggerFactory.getLogger(ExpiredAckAsyncConsumerTest.class);
+
+    private BrokerService broker;
+    private Connection connection;
+    private ConnectionConsumer connectionConsumer;
+    private Queue queue;
+    private AtomicBoolean finished = new AtomicBoolean();
+    private AtomicBoolean failed = new AtomicBoolean();
+
+
+    @Before
+    public void setUp() throws Exception {
+
+        broker = new BrokerService();
+        broker.addConnector("tcp://localhost:0");
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+        broker.waitUntilStarted();
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
+        factory.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                failed.set(true);
+            }
+        });
+        connection = factory.createConnection();
+        queue = createQueue();
+        connection.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        connectionConsumer.close();
+        connection.close();
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testAsyncMessageExpiration() throws Exception {
+        ExecutorService executors = Executors.newFixedThreadPool(1);
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queue);
+        producer.setTimeToLive(10L);
+
+        //Send 30 messages and make sure we can consume with multiple threads without failing
+        //even when messages are expired
+        executors.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(100);
+                    int count = 0;
+                    while (!failed.get() && count < 30) {
+                        producer.send(session.createTextMessage("Hello World: " + count));
+                        LOG.info("sending: " + count);
+                        count++;
+                        Thread.sleep(100L);
+                    }
+                    finished.set(true);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+            }
+        });
+
+        connectionConsumer = connection.createConnectionConsumer(
+                queue, null, new TestServerSessionPool(connection), 1000);
+
+        assertTrue("received messages", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return finished.get();
+            }
+        }));
+
+        assertFalse("An exception was received on receive", failed.get());
+    }
+
+
+    protected Queue createQueue() {
+        return new ActiveMQQueue("TEST");
+    }
+
+
+    /**
+     * Simulate a ServerSessionPool in an application server with 15 threads
+     *
+     */
+    private class TestServerSessionPool implements ServerSessionPool {
+        Connection connection;
+        LinkedBlockingQueue<TestServerSession> serverSessions = new LinkedBlockingQueue<>(10);
+
+        public TestServerSessionPool(Connection connection) throws JMSException {
+            this.connection = connection;
+            for (int i = 0; i < 15; i++) {
+                addSession();
+            }
+        }
+
+        @Override
+        public ServerSession getServerSession() throws JMSException {
+            try {
+                return serverSessions.take();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("could not get session");
+            }
+        }
+
+        public void addSession() {
+            try {
+                serverSessions.add(new TestServerSession(this));
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    /**
+     * Simulate a ServerSession
+     *
+     */
+    private class TestServerSession implements ServerSession {
+        TestServerSessionPool pool;
+        Session session;
+
+        public TestServerSession(TestServerSessionPool pool) throws JMSException {
+            this.pool = pool;
+            session = pool.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.setMessageListener(new TestMessageListener());
+        }
+
+        @Override
+        public Session getSession() throws JMSException {
+            return session;
+        }
+
+        @Override
+        public void start() throws JMSException {
+            new Thread() {
+                @Override
+                public void run() {
+                    //execute run on the session
+                    if (!finished.get()) {
+                        try {
+                            session.run();
+                            pool.addSession();
+                        } catch (Exception e) {
+                        }
+                    }
+                }
+            }.start();
+        }
+    }
+
+
+    private class TestMessageListener implements MessageListener {
+        @Override
+        public void onMessage(Message message) {
+            try {
+                Thread.sleep(1000L);
+                String text = ((TextMessage) message).getText();
+                LOG.info("got message: " + text);
+            } catch (Exception e) {
+                LOG.error("in onMessage", e);
+            }
+        }
+    }
+
+}


Mime
View raw message