Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A50218BAD for ; Thu, 14 Jan 2016 23:28:34 +0000 (UTC) Received: (qmail 37839 invoked by uid 500); 14 Jan 2016 23:28:34 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 37796 invoked by uid 500); 14 Jan 2016 23:28:34 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 37774 invoked by uid 99); 14 Jan 2016 23:28:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jan 2016 23:28:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7DFFBE389A; Thu, 14 Jan 2016 23:28:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <80468aa6bbb04530afcaa346b813e4d4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6117 Date: Thu, 14 Jan 2016 23:28:33 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 5adbafef3 -> 5b73ffad6 https://issues.apache.org/jira/browse/AMQ-6117 Test to try and reproduce the issue. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5b73ffad Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5b73ffad Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5b73ffad Branch: refs/heads/master Commit: 5b73ffad6bd000fdad93bc473900b2374d36181a Parents: 5adbafe Author: Timothy Bish Authored: Thu Jan 14 18:28:08 2016 -0500 Committer: Timothy Bish Committed: Thu Jan 14 18:28:08 2016 -0500 ---------------------------------------------------------------------- .../org/apache/activemq/bugs/AMQ6117Test.java | 148 +++++++++++++++++++ 1 file changed, 148 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5b73ffad/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6117Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6117Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6117Test.java new file mode 100644 index 0000000..e7d19fb --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6117Test.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class AMQ6117Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ6117Test.class); + + private BrokerService broker; + + @Test + public void testViewIsStale() throws Exception { + + final int MSG_COUNT = 10; + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("Test-Queue"); + Queue dlq = session.createQueue("ActiveMQ.DLQ"); + + MessageProducer producer = session.createProducer(queue); + + // Ensure there is a DLQ in existence to start. + session.createProducer(dlq); + + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createMessage(), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 1000); + } + + final QueueViewMBean queueView = getProxyToQueue(dlq.getQueueName()); + + assertTrue("Message should be DLQ'd", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == MSG_COUNT; + } + })); + + LOG.info("DLQ has captured all expired messages"); + + Deque browsed = new LinkedList(); + CompositeData[] elements = queueView.browse(); + assertEquals(MSG_COUNT, elements.length); + + for (CompositeData element : elements) { + String messageID = (String) element.get("JMSMessageID"); + LOG.debug("MessageID: {}", messageID); + browsed.add(messageID); + } + + String removedMsgId = browsed.removeFirst(); + assertTrue(queueView.removeMessage(removedMsgId)); + assertEquals(MSG_COUNT - 1, queueView.getQueueSize()); + elements = queueView.browse(); + assertEquals(MSG_COUNT - 1, elements.length); + + for (CompositeData element : elements) { + String messageID = (String) element.get("JMSMessageID"); + LOG.debug("MessageID: {}", messageID); + assertFalse(messageID.equals(removedMsgId)); + } + } + + @Before + public void setup() throws Exception { + + PolicyMap policyMap = new PolicyMap(); + List entries = new ArrayList(); + + PolicyEntry pe = new PolicyEntry(); + pe.setExpireMessagesPeriod(1500); + pe.setQueue(">"); + entries.add(pe); + + policyMap.setPolicyEntries(entries); + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setDestinationPolicy(policyMap); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } +}