Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F3027186DE for ; Tue, 12 Jan 2016 17:00:48 +0000 (UTC) Received: (qmail 87510 invoked by uid 500); 12 Jan 2016 17:00:48 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 87425 invoked by uid 500); 12 Jan 2016 17:00:48 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 87398 invoked by uid 99); 12 Jan 2016 17:00:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jan 2016 17:00:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A2401E03D0; Tue, 12 Jan 2016 17:00:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Date: Tue, 12 Jan 2016 17:00:49 -0000 Message-Id: <30232b9017184f70a66edb16a4013ab6@git.apache.org> In-Reply-To: <719237daaed547c9b26a822e312393b0@git.apache.org> References: <719237daaed547c9b26a822e312393b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6059 https://issues.apache.org/jira/browse/AMQ-6059 Ensure that a message sent to the store for the DLQ is rewritten so that its updated values are written to prevent exirpation loops and loss of reollback cause etc. (cherry picked from commit 505a76a8bb7180debbd36637dce1b9101150d0b4) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b04cfeb8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b04cfeb8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b04cfeb8 Branch: refs/heads/activemq-5.13.x Commit: b04cfeb8af2cdf7d9ccbcd65a309478d4e01db9f Parents: b4405be Author: Timothy Bish Authored: Tue Jan 12 11:51:35 2016 -0500 Committer: Timothy Bish Committed: Tue Jan 12 12:00:41 2016 -0500 ---------------------------------------------------------------------- .../org/apache/activemq/util/BrokerSupport.java | 13 +- .../org/apache/activemq/bugs/AMQ6059Test.java | 199 +++++++++++++++++++ 2 files changed, 206 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b04cfeb8/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java index f3f3b78..df3b658 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java @@ -31,24 +31,24 @@ import org.apache.activemq.state.ProducerState; */ public final class BrokerSupport { - private BrokerSupport() { + private BrokerSupport() { } - + public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception { doResend(context, originalMessage, deadLetterDestination, false); } - + /** * @param context - * @param originalMessage + * @param originalMessage * @param deadLetterDestination * @throws Exception */ public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception { doResend(context, originalMessage, deadLetterDestination, true); } - - public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception { + + public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception { Message message = copy ? originalMessage.copy() : originalMessage; message.setOriginalDestination(message.getDestination()); message.setOriginalTransactionId(message.getTransactionId()); @@ -56,6 +56,7 @@ public final class BrokerSupport { message.setTransactionId(null); message.setMemoryUsage(null); message.setRedeliveryCounter(0); + message.getMessageId().setDataLocator(null); boolean originalFlowControl = context.isProducerFlowControl(); try { context.setProducerFlowControl(false); http://git-wip-us.apache.org/repos/asf/activemq/blob/b04cfeb8/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java new file mode 100644 index 0000000..049d683 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Enumeration; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Once the wire format is completed we can test against real persistence storage. + */ +public class AMQ6059Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ6059Test.class); + + private BrokerService broker; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testDLQRecovery() throws Exception { + + sendMessage(new ActiveMQQueue("leveldbQueue")); + TimeUnit.SECONDS.sleep(3); + + LOG.info("### Check for expired message moving to DLQ."); + + Queue dlqQueue = (Queue) createDlqDestination(); + verifyIsDlq(dlqQueue); + + final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQueue.getQueueName()); + + assertTrue("The message expired", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("DLQ stats: Enqueues {}, Dispatches {}, Expired {}, Inflight {}", + new Object[] { queueViewMBean.getEnqueueCount(), + queueViewMBean.getDispatchCount(), + queueViewMBean.getExpiredCount(), + queueViewMBean.getInFlightCount()}); + return queueViewMBean.getEnqueueCount() == 1; + } + })); + + verifyMessageIsRecovered(dlqQueue); + restartBroker(broker); + verifyMessageIsRecovered(dlqQueue); + } + + protected BrokerService createBroker() throws Exception { + return createBrokerWithDLQ(true); + } + + private BrokerService createBrokerWithDLQ(boolean purge) throws Exception { + BrokerService broker = new BrokerService(); + + File directory = new File("target/activemq-data/leveldb"); + if (purge) { + IOHelper.deleteChildren(directory); + } + + LevelDBStore levelDBStore = new LevelDBStore(); + levelDBStore.setDirectory(directory); + if (purge) { + levelDBStore.deleteAllMessages(); + } + + PolicyMap pMap = new PolicyMap(); + + SharedDeadLetterStrategy sharedDLQStrategy = new SharedDeadLetterStrategy(); + sharedDLQStrategy.setProcessNonPersistent(true); + sharedDLQStrategy.setProcessExpired(true); + sharedDLQStrategy.setDeadLetterQueue(new ActiveMQQueue("ActiveMQ.DLQ")); + sharedDLQStrategy.setExpiration(10000); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setDeadLetterStrategy(sharedDLQStrategy); + defaultPolicy.setExpireMessagesPeriod(2000); + defaultPolicy.setUseCache(false); + + pMap.put(new ActiveMQQueue(">"), defaultPolicy); + broker.setDestinationPolicy(pMap); + broker.setPersistenceAdapter(levelDBStore); + if (purge) { + broker.setDeleteAllMessagesOnStartup(true); + } + + return broker; + } + + private void restartBroker(BrokerService broker) throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createBrokerWithDLQ(false); + broker.start(); + broker.waitUntilStarted(); + } + + private void verifyMessageIsRecovered(final Queue dlqQueue) throws Exception, JMSException { + Connection connection = createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = session.createBrowser(dlqQueue); + Enumeration elements = browser.getEnumeration(); + assertTrue(elements.hasMoreElements()); + Message browsed = (Message) elements.nextElement(); + assertNotNull("Recover message after broker restarts", browsed); + } + + private void sendMessage(Destination destination) throws Exception { + Connection connection = createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.send(destination, session.createTextMessage("DLQ message"), DeliveryMode.PERSISTENT, 4, 1000); + connection.stop(); + LOG.info("### Send message that will expire."); + } + + private Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + return factory.createConnection(); + } + + private Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ"); + } + + private void verifyIsDlq(Queue dlqQ) throws Exception { + final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQ.getQueueName()); + assertTrue("is dlq", queueViewMBean.isDLQ()); + } + + private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance( + queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } +}