Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D72731858E for ; Tue, 9 Jun 2015 16:36:35 +0000 (UTC) Received: (qmail 28545 invoked by uid 500); 9 Jun 2015 16:36:35 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 28366 invoked by uid 500); 9 Jun 2015 16:36:35 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 25977 invoked by uid 99); 9 Jun 2015 16:36:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 16:36:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F667E02EE; Tue, 9 Jun 2015 16:36:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 09 Jun 2015 16:37:03 -0000 Message-Id: <30d8307e5e4e45c09f7a1041db6dc101@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [32/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java new file mode 100644 index 0000000..6f55e3d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; + +import java.net.Socket; +import java.util.Set; + +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TransportConnectorMBeanTest { + private static final Logger LOG = LoggerFactory.getLogger(TransportConnectorMBeanTest.class); + + BrokerService broker; + + @Test + public void verifyRemoteAddressInMbeanName() throws Exception { + doVerifyRemoteAddressInMbeanName(true); + } + + @Test + public void verifyRemoteAddressNotInMbeanName() throws Exception { + doVerifyRemoteAddressInMbeanName(false); + } + + @Test + public void verifyClientIdNetwork() throws Exception { + doVerifyClientIdNetwork(false); + } + + @Test + public void verifyClientIdDuplexNetwork() throws Exception { + doVerifyClientIdNetwork(true); + } + + private void doVerifyClientIdNetwork(boolean duplex) throws Exception { + createBroker(true); + + BrokerService networked = new BrokerService(); + networked.setBrokerName("networked"); + networked.setPersistent(false); + NetworkConnector nc = networked.addNetworkConnector("static:" + broker.getTransportConnectors().get(0).getPublishableConnectString()); + nc.setDuplex(duplex); + networked.start(); + + try { + assertTrue("presence of mbean with clientId", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Set registeredMbeans = getRegisteredMbeans(); + return match("_outbound", registeredMbeans); + } + })); + + } finally { + networked.stop(); + } + } + + private void doVerifyRemoteAddressInMbeanName(boolean allowRemoteAddress) throws Exception { + createBroker(allowRemoteAddress); + ActiveMQConnection connection = createConnection(); + Set registeredMbeans = getRegisteredMbeans(); + assertEquals("presence of mbean with clientId", true, match(connection.getClientID(), registeredMbeans)); + assertEquals("presence of mbean with local port", allowRemoteAddress, match(extractLocalPort(connection), registeredMbeans)); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + private boolean match(String s, Set registeredMbeans) { + String encodedName = JMXSupport.encodeObjectNamePart(s); + for (ObjectName name : registeredMbeans) { + LOG.info("checking for match:" + encodedName + ", with: " + name.toString()); + if (name.toString().contains(encodedName)) { + return true; + } + } + return false; + } + + private String extractLocalPort(ActiveMQConnection connection) throws Exception { + Socket socket = connection.getTransport().narrow(Socket.class); + return String.valueOf(socket.getLocalPort()); + } + + private Set getRegisteredMbeans() throws Exception { + // need a little sleep to ensure JMX is up to date + Thread.sleep(200); + return broker.getManagementContext().queryNames(null, null); + } + + private ActiveMQConnection createConnection() throws Exception { + final String opts = "?jms.watchTopicAdvisories=false"; + ActiveMQConnection connection = (ActiveMQConnection) + new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + opts).createConnection(); + connection.start(); + return connection; + } + + private void createBroker(boolean allowRemoteAddressInMbeanNames) throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.addConnector("tcp://localhost:0"); + broker.getManagementContext().setAllowRemoteAddressInMBeanNames(allowRemoteAddressInMbeanNames); + broker.start(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java new file mode 100644 index 0000000..4cc57ba --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.LinkedList; +import java.util.List; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; + +public class mKahaDBXARecoveryBrokerTest extends XARecoveryBrokerTest { + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + + MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter(); + List adapters = new LinkedList(); + FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter(); + defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + adapters.add(defaultEntry); + + FilteredKahaDBPersistenceAdapter special = new FilteredKahaDBPersistenceAdapter(); + special.setDestination(new ActiveMQQueue("special")); + special.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + adapters.add(special); + + mKahaDB.setFilteredPersistenceAdapters(adapters); + broker.setPersistenceAdapter(mKahaDB); + } + + public static Test suite() { + return suite(mKahaDBXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue("test,special"); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java new file mode 100644 index 0000000..147d89a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; + +import java.util.LinkedList; +import java.util.List; + +public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest { + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + + MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter(); + List adapters = new LinkedList(); + FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter(); + defaultEntry.setPersistenceAdapter(new LevelDBPersistenceAdapter()); + adapters.add(defaultEntry); + + FilteredKahaDBPersistenceAdapter special = new FilteredKahaDBPersistenceAdapter(); + special.setDestination(new ActiveMQQueue("special")); + special.setPersistenceAdapter(new LevelDBPersistenceAdapter()); + adapters.add(special); + + mKahaDB.setFilteredPersistenceAdapters(adapters); + broker.setPersistenceAdapter(mKahaDB); + } + + public static Test suite() { + return suite(mLevelDBXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue("test,special"); + } + + public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { + // super.testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback(); + } + public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception { + // super.testQueuePersistentUncommittedAcksLostOnRestart(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java new file mode 100644 index 0000000..092c554 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.message.security; + +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; +import javax.jms.JMSException; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.security.MessageAuthorizationPolicy; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.spring.ConsumerBean; + +/** + * + */ +public class MessageAuthenticationTest extends EmbeddedBrokerTestSupport { + + private Connection connection; + + public void testSendInvalidMessage() throws Exception { + if (connection == null) { + connection = createConnection(); + } + connection.start(); + + ConsumerBean messageList = new ConsumerBean(); + messageList.setVerbose(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination = new ActiveMQQueue("MyQueue"); + + MessageConsumer c1 = session.createConsumer(destination); + + c1.setMessageListener(messageList); + + MessageProducer producer = session.createProducer(destination); + assertNotNull(producer); + + producer.send(createMessage(session, "invalidBody", "myHeader", "xyz")); + producer.send(createMessage(session, "validBody", "myHeader", "abc")); + + messageList.assertMessagesArrived(1); + assertEquals("validBody", ((TextMessage) messageList.flushMessages().get(0)).getText()); + } + + private javax.jms.Message createMessage(Session session, String body, String header, String value) throws JMSException { + TextMessage msg = session.createTextMessage(body); + msg.setStringProperty(header, value); + return msg; + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.setMessageAuthorizationPolicy(new MessageAuthorizationPolicy() { + public boolean isAllowedToConsume(ConnectionContext context, Message message) { + try { + Object value = message.getProperty("myHeader"); + return "abc".equals(value); + } + catch (IOException e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + return false; + } + } + }); + answer.addConnector(bindAddress); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml new file mode 100644 index 0000000..245d946 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml @@ -0,0 +1,50 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java new file mode 100644 index 0000000..dcf4e69 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.partition; + +import junit.framework.TestCase; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.partition.PartitionBrokerPlugin; +import org.apache.activemq.partition.dto.Partitioning; + +/** + */ +public class SpringPartitionBrokerTest extends TestCase { + + public void testCreatePartitionBroker() throws Exception { + + BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml"); + assertEquals(1, broker.getPlugins().length); + PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0]; + Partitioning config = plugin.getConfig(); + assertEquals(2, config.getBrokers().size()); + + Object o; + String json = "{\n" + + " \"by_client_id\":{\n" + + " \"client1\":{\"ids\":[\"broker1\"]},\n" + + " \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" + + " },\n" + + " \"brokers\":{\n" + + " \"broker1\":\"tcp://localhost:61616\",\n" + + " \"broker2\":\"tcp://localhost:61616\"\n" + + " }\n" + + "}"; + Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class); + assertEquals(expected.toString(), config.toString()); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java new file mode 100644 index 0000000..3cfd595 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.util.concurrent.TimeUnit; + +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(value = Parameterized.class) +public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test { + private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class); + protected long maxTimeSinceLastAck = 5 * 1000; + + AbortSlowAckConsumerStrategy strategy; + + public AbortSlowAckConsumer0Test(Boolean isTopic) { + super(isTopic); + } + + @Override + protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() { + AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); + strategy.setAbortConnection(abortConnection); + strategy.setCheckPeriod(checkPeriod); + strategy.setMaxSlowDuration(maxSlowDuration); + strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck); + + return strategy; + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PolicyEntry policy = new PolicyEntry(); + + strategy = createSlowConsumerStrategy(); + underTest = strategy; + + policy.setSlowConsumerStrategy(strategy); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.getPrefetchPolicy().setAll(1); + return factory; + } + + @Override + @Test + public void testSlowConsumerIsAbortedViaJmx() throws Exception { + strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort + super.testSlowConsumerIsAbortedViaJmx(); + } + + @Test + public void testZeroPrefetchConsumerIsAborted() throws Exception { + strategy.setMaxTimeSinceLastAck(2000); // Make it shorter + + ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + assertNotNull(consumer); + conn.start(); + startProducers(destination, 20); + + Message message = consumer.receive(5000); + assertNotNull(message); + + TimeUnit.SECONDS.sleep(15); + + try { + consumer.receive(5000); + fail("Slow consumer not aborted."); + } catch (Exception ex) { + } + } + + @Test + public void testIdleConsumerCanBeAbortedNoMessages() throws Exception { + strategy.setIgnoreIdleConsumers(false); + strategy.setMaxTimeSinceLastAck(2000); // Make it shorter + + ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + assertNotNull(consumer); + conn.start(); + + startProducers(destination, 1); + + Message message = consumer.receive(5000); + assertNotNull(message); + + // Consumer needs to be closed before the reeive call. + TimeUnit.SECONDS.sleep(15); + + try { + consumer.receive(5000); + fail("Idle consumer not aborted."); + } catch (Exception ex) { + } + } + + @Test + public void testIdleConsumerCanBeAborted() throws Exception { + strategy.setIgnoreIdleConsumers(false); + strategy.setMaxTimeSinceLastAck(2000); // Make it shorter + + ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + assertNotNull(consumer); + conn.start(); + startProducers(destination, 1); + + Message message = consumer.receive(5000); + assertNotNull(message); + message.acknowledge(); + + // Consumer needs to be closed before the reeive call. + TimeUnit.SECONDS.sleep(15); + + try { + consumer.receive(5000); + fail("Idle consumer not aborted."); + } catch (Exception ex) { + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java new file mode 100644 index 0000000..6d3e970 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.ConnectionFactory; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test { + + protected long maxTimeSinceLastAck = 5 * 1000; + + public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) { + super(abortConnection, topic); + } + + @Override + protected AbortSlowConsumerStrategy createSlowConsumerStrategy() { + return new AbortSlowConsumerStrategy(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PolicyEntry policy = new PolicyEntry(); + + AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); + strategy.setAbortConnection(abortConnection); + strategy.setCheckPeriod(checkPeriod); + strategy.setMaxSlowDuration(maxSlowDuration); + strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck); + + policy.setSlowConsumerStrategy(strategy); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.getPrefetchPolicy().setAll(1); + return factory; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java new file mode 100644 index 0000000..948613e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.ConnectionFactory; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test { + + protected long maxTimeSinceLastAck = 5 * 1000; + + public AbortSlowAckConsumer2Test(Boolean topic) { + super(topic); + } + + @Override + protected AbortSlowConsumerStrategy createSlowConsumerStrategy() { + return new AbortSlowConsumerStrategy(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PolicyEntry policy = new PolicyEntry(); + + AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); + strategy.setAbortConnection(abortConnection); + strategy.setCheckPeriod(checkPeriod); + strategy.setMaxSlowDuration(maxSlowDuration); + strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck); + + policy.setSlowConsumerStrategy(strategy); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.getPrefetchPolicy().setAll(1); + return factory; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java new file mode 100644 index 0000000..9f23443 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.SocketProxy; +import org.apache.activemq.util.Wait; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.*; + + +@RunWith(value = Parameterized.class) +public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class); + + @Parameterized.Parameters(name = "isTopic({0})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + + public AbortSlowConsumer0Test(Boolean isTopic) { + this.topic = isTopic; + } + + @Test + public void testRegularConsumerIsNotAborted() throws Exception { + startConsumers(destination); + for (Connection c : connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + allMessagesList.waitForMessagesToArrive(10); + allMessagesList.assertAtLeastMessagesReceived(10); + } + + @Test + public void testSlowConsumerIsAbortedViaJmx() throws Exception { + underTest.setMaxSlowDuration(60*1000); // so jmx does the abort + startConsumers(withPrefetch(2, destination)); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(8 * 1000); + for (Connection c : connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + + consumertoAbort.getValue().assertMessagesReceived(1); + + ActiveMQDestination amqDest = (ActiveMQDestination)destination; + ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + + (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName=" + + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost"); + + DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true); + ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy(); + + assertNotNull(slowConsumerPolicyMBeanName); + + AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean) + broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true); + + TimeUnit.SECONDS.sleep(3); + + TabularData slowOnes = abortPolicy.getSlowConsumers(); + assertEquals("one slow consumers", 1, slowOnes.size()); + + LOG.info("slow ones:" + slowOnes); + + CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next(); + LOG.info("Slow one: " + slowOne); + + assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName); + abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription")); + + consumertoAbort.getValue().assertAtMostMessagesReceived(1); + + slowOnes = abortPolicy.getSlowConsumers(); + assertEquals("no slow consumers left", 0, slowOnes.size()); + + // verify mbean gone with destination + broker.getAdminView().removeTopic(amqDest.getPhysicalName()); + + try { + abortPolicy.getSlowConsumers(); + fail("expect not found post destination removal"); + } catch(UndeclaredThrowableException expected) { + assertTrue("correct exception: " + expected.getCause(), + expected.getCause() instanceof InstanceNotFoundException); + } + } + + private Destination withPrefetch(int i, Destination destination) { + String destWithPrefetch = + ((ActiveMQDestination) destination).getPhysicalName() + "?consumer.prefetchSize=" + i; + return topic ? new ActiveMQTopic(destWithPrefetch) : new ActiveMQQueue(destWithPrefetch); + } + + @Test + public void testOnlyOneSlowConsumerIsAborted() throws Exception { + consumerCount = 10; + startConsumers(destination); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(8 * 1000); + for (Connection c : connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + + allMessagesList.waitForMessagesToArrive(99); + allMessagesList.assertAtLeastMessagesReceived(99); + + consumertoAbort.getValue().assertMessagesReceived(1); + TimeUnit.SECONDS.sleep(5); + consumertoAbort.getValue().assertAtMostMessagesReceived(1); + } + + @Test + public void testAbortAlreadyClosingConsumers() throws Exception { + consumerCount = 1; + startConsumers(withPrefetch(2, destination)); + for (MessageIdList list : consumers.values()) { + list.setProcessingDelay(6 * 1000); + } + for (Connection c : connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + allMessagesList.waitForMessagesToArrive(consumerCount); + + for (MessageConsumer consumer : consumers.keySet()) { + LOG.info("closing consumer: " + consumer); + /// will block waiting for on message till 6secs expire + consumer.close(); + } + } + + @Test + public void testAbortConsumerOnDeadConnection() throws Exception { + TransportConnector transportConnector = broker.addConnector("tcp://0.0.0.0:0"); + transportConnector.setBrokerService(broker); + transportConnector.setTaskRunnerFactory(broker.getTaskRunnerFactory()); + transportConnector.start(); + SocketProxy socketProxy = new SocketProxy(transportConnector.getPublishableConnectURI()); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(socketProxy.getUrl()); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(4); + connectionFactory.setPrefetchPolicy(prefetchPolicy); + Connection c = connectionFactory.createConnection(); + connections.add(c); + c.start(); + Session session = c.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final ActiveMQMessageConsumer messageconsumer = (ActiveMQMessageConsumer) session.createConsumer(destination); + startProducers(destination, 10); + + messageconsumer.receive(4000).acknowledge(); + assertNotNull(messageconsumer.receive(4000)); + assertNotNull(messageconsumer.receive(4000)); + assertNotNull(messageconsumer.receive(4000)); + + // close control command won't get through + socketProxy.pause(); + + ActiveMQDestination amqDest = (ActiveMQDestination)destination; + ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + + (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName=" + + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost"); + + final DestinationViewMBean destView = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true); + + assertTrue("Consumer gone from broker view", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("DestView {} comsumerCount {}", destView, destView.getConsumerCount()); + return 0 == destView.getConsumerCount(); + } + })); + + socketProxy.goOn(); + + assertTrue("consumer was closed", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + boolean closed = false; + try { + messageconsumer.receive(400); + } catch (javax.jms.IllegalStateException expected) { + closed = expected.toString().contains("closed"); + } + return closed; + } + })); + } + + @Override + public void onException(JMSException exception) { + exceptions.add(exception); + exception.printStackTrace(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java new file mode 100644 index 0000000..e17b362 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.util.MessageIdList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +@RunWith(value = Parameterized.class) +public class AbortSlowConsumer1Test extends AbortSlowConsumerBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer1Test.class); + + @Parameterized.Parameters(name = "abortConnection({0})-isTopic({1})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{ + {Boolean.TRUE, Boolean.TRUE}, + {Boolean.TRUE, Boolean.FALSE}, + {Boolean.FALSE, Boolean.TRUE}, + {Boolean.FALSE, Boolean.FALSE}}); + } + + public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) { + this.abortConnection = abortConnection; + this.topic = topic; + } + + @Test(timeout = 60 * 1000) + public void testSlowConsumerIsAborted() throws Exception { + startConsumers(destination); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(8 * 1000); + for (Connection c : connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + + consumertoAbort.getValue().assertMessagesReceived(1); + TimeUnit.SECONDS.sleep(5); + consumertoAbort.getValue().assertAtMostMessagesReceived(1); + } + + @Test(timeout = 60 * 1000) + public void testAbortAlreadyClosedConsumers() throws Exception { + Connection conn = createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + conn.start(); + startProducers(destination, 20); + TimeUnit.SECONDS.sleep(1); + LOG.info("closing consumer: " + consumer); + consumer.close(); + + TimeUnit.SECONDS.sleep(5); + assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testAbortAlreadyClosedConnection() throws Exception { + Connection conn = createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + sess.createConsumer(destination); + conn.start(); + startProducers(destination, 20); + TimeUnit.SECONDS.sleep(1); + LOG.info("closing connection: " + conn); + conn.close(); + + TimeUnit.SECONDS.sleep(5); + assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java new file mode 100644 index 0000000..7263027 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map.Entry; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import org.apache.activemq.util.MessageIdList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class AbortSlowConsumer2Test extends AbortSlowConsumerBase { + + @Parameterized.Parameters(name = "isTopic({0})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + + public AbortSlowConsumer2Test(Boolean isTopic) { + this.topic = isTopic; + } + + @Test(timeout = 60 * 1000) + public void testLittleSlowConsumerIsNotAborted() throws Exception { + startConsumers(destination); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(500); + for (Connection c : connections) { + c.setExceptionListener(this); + } + startProducers(destination, 12); + allMessagesList.waitForMessagesToArrive(10); + allMessagesList.assertAtLeastMessagesReceived(10); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java new file mode 100644 index 0000000..ee28112 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import junit.framework.Test; +import org.apache.activemq.JmsMultipleClientsTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.MessageIdList; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + + +public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener { + + private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class); + + protected AbortSlowConsumerStrategy underTest; + protected boolean abortConnection = false; + protected long checkPeriod = 2 * 1000; + protected long maxSlowDuration = 5 * 1000; + protected final List exceptions = new ArrayList(); + + @Override + @Before + public void setUp() throws Exception { + exceptions.clear(); + topic = true; + underTest = createSlowConsumerStrategy(); + super.setUp(); + createDestination(); + } + + protected AbortSlowConsumerStrategy createSlowConsumerStrategy() { + return new AbortSlowConsumerStrategy(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PolicyEntry policy = new PolicyEntry(); + underTest.setAbortConnection(abortConnection); + underTest.setCheckPeriod(checkPeriod); + underTest.setMaxSlowDuration(maxSlowDuration); + + policy.setSlowConsumerStrategy(underTest); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + @Override + public void onException(JMSException exception) { + exceptions.add(exception); + exception.printStackTrace(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java new file mode 100644 index 0000000..6c31237 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.Destination; +import javax.jms.Message; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class DeadLetterTest extends DeadLetterTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(DeadLetterTest.class); + + protected int rollbackCount; + + protected void doTest() throws Exception { + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + LOG.info("Will redeliver messages: " + rollbackCount + " times"); + + makeConsumer(); + makeDlqConsumer(); + + sendMessages(); + + // now lets receive and rollback N times + for (int i = 0; i < messageCount; i++) { + consumeAndRollback(i); + } + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertMessage(msg, i); + assertNotNull("Should be a DLQ message for loop: " + i, msg); + } + session.commit(); + } + + protected void consumeAndRollback(int messageCounter) throws Exception { + for (int i = 0; i < rollbackCount; i++) { + Message message = consumer.receive(5000); + assertNotNull("No message received for message: " + messageCounter + " and rollback loop: " + i, message); + assertMessage(message, messageCounter); + + session.rollback(); + } + LOG.info("Rolled back: " + rollbackCount + " times"); + } + + protected void setUp() throws Exception { + transactedMode = true; + super.setUp(); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory answer = super.createConnectionFactory(); + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setMaximumRedeliveries(3); + policy.setBackOffMultiplier((short) 1); + policy.setInitialRedeliveryDelay(10); + policy.setUseExponentialBackOff(false); + answer.setRedeliveryPolicy(policy); + return answer; + } + + protected Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ"); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java new file mode 100644 index 0000000..b275f2e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public abstract class DeadLetterTestSupport extends TestSupport { + private static final Logger LOG = LoggerFactory.getLogger(DeadLetterTestSupport.class); + + protected int messageCount = 10; + protected long timeToLive; + protected Connection connection; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected boolean durableSubscriber; + protected Destination dlqDestination; + protected MessageConsumer dlqConsumer; + protected QueueBrowser dlqBrowser; + protected BrokerService broker; + protected boolean transactedMode; + protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE; + private Destination destination; + + protected void setUp() throws Exception { + super.setUp(); + broker = createBroker(); + broker.start(); + connection = createConnection(); + connection.setClientID(createClientId()); + + session = connection.createSession(transactedMode, acknowledgeMode); + connection.start(); + } + + protected String createClientId() { + return toString(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + if (broker != null) { + broker.stop(); + } + } + + protected abstract void doTest() throws Exception; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + PolicyEntry policy = new PolicyEntry(); + DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy(); + if(defaultDeadLetterStrategy!=null) { + defaultDeadLetterStrategy.setProcessNonPersistent(true); + } + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + protected void makeConsumer() throws JMSException { + Destination destination = getDestination(); + LOG.info("Consuming from: " + destination); + if (durableSubscriber) { + consumer = session.createDurableSubscriber((Topic)destination, destination.toString()); + } else { + consumer = session.createConsumer(destination); + } + } + + protected void makeDlqConsumer() throws Exception { + dlqDestination = createDlqDestination(); + + LOG.info("Consuming from dead letter on: " + dlqDestination); + dlqConsumer = session.createConsumer(dlqDestination); + } + + protected void makeDlqBrowser() throws JMSException { + dlqDestination = createDlqDestination(); + + LOG.info("Browsing dead letter on: " + dlqDestination); + dlqBrowser = session.createBrowser((Queue)dlqDestination); + } + + protected void sendMessages() throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(getDestination()); + producer.setDeliveryMode(deliveryMode); + producer.setTimeToLive(timeToLive); + + LOG.info("Sending " + messageCount + " messages to: " + getDestination()); + for (int i = 0; i < messageCount; i++) { + Message message = createMessage(session, i); + producer.send(message); + } + } + + protected TextMessage createMessage(Session session, int i) throws JMSException { + return session.createTextMessage(getMessageText(i)); + } + + protected String getMessageText(int i) { + return "message: " + i; + } + + protected void assertMessage(Message message, int i) throws Exception { + LOG.info("Received message: " + message); + assertNotNull("No message received for index: " + i, message); + assertTrue("Should be a TextMessage not: " + message, message instanceof TextMessage); + TextMessage textMessage = (TextMessage)message; + assertEquals("text of message: " + i, getMessageText(i), textMessage.getText()); + } + + protected abstract Destination createDlqDestination(); + + public void testTransientTopicMessage() throws Exception { + super.topic = true; + deliveryMode = DeliveryMode.NON_PERSISTENT; + durableSubscriber = true; + doTest(); + } + + public void testDurableTopicMessage() throws Exception { + super.topic = true; + deliveryMode = DeliveryMode.PERSISTENT; + durableSubscriber = true; + doTest(); + } + + public void testTransientQueueMessage() throws Exception { + super.topic = false; + deliveryMode = DeliveryMode.NON_PERSISTENT; + durableSubscriber = false; + doTest(); + validateConsumerPrefetch(this.getDestinationString(), 0); + } + + public void testDurableQueueMessage() throws Exception { + super.topic = false; + deliveryMode = DeliveryMode.PERSISTENT; + durableSubscriber = false; + doTest(); + validateConsumerPrefetch(this.getDestinationString(), 0); + } + + public Destination getDestination() { + if (destination == null) { + destination = createDestination(); + } + return destination; + } + + private void validateConsumerPrefetch(String destination, long expectedCount) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { + if (dest.getName().equals(destination)) { + DestinationStatistics stats = dest.getDestinationStatistics(); + LOG.info(">>>> inflight for : " + dest.getName() + ": " + stats.getInflight().getCount()); + assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", + expectedCount, stats.getInflight().getCount()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java new file mode 100644 index 0000000..a5f7984 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.springframework.core.io.ClassPathResource; + +/** + * + */ +public class DestinationCursorConfigTest extends TestSupport { + protected BrokerService broker; + + @Override + protected void setUp() throws Exception { + broker = createBroker(); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerFactoryBean factory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/policy/cursor.xml")); + factory.afterPropertiesSet(); + BrokerService answer = factory.getBroker(); + return answer; + } + + public void testQueueConfiguration() throws Exception { + super.topic = false; + ActiveMQDestination destination = (ActiveMQDestination) createDestination("org.apache.foo"); + PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); + PendingQueueMessageStoragePolicy policy = entry.getPendingQueuePolicy(); + assertNotNull(policy); + assertTrue("Policy is: " + policy, policy instanceof VMPendingQueueMessageStoragePolicy); + } + + public void testTopicConfiguration() throws Exception { + super.topic = true; + ActiveMQDestination destination = (ActiveMQDestination) createDestination("org.apache.foo"); + PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); + PendingSubscriberMessageStoragePolicy policy = entry.getPendingSubscriberPolicy(); + assertNotNull(policy); + assertFalse(entry.isProducerFlowControl()); + assertTrue(entry.getMemoryLimit()==(1024*1024)); + assertTrue("subscriberPolicy is: " + policy, policy instanceof VMPendingSubscriberMessageStoragePolicy); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java new file mode 100644 index 0000000..f43f886 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.Message; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.DiscardingDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class DiscardingDeadLetterPolicyTest extends DeadLetterTest { + private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class); + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + DeadLetterStrategy strategy = new DiscardingDeadLetterStrategy(); + strategy.setProcessNonPersistent(true); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + @Override + protected void doTest() throws Exception { + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + LOG.info("Will redeliver messages: " + rollbackCount + " times"); + + makeConsumer(); + makeDlqConsumer(); + + sendMessages(); + + // now lets receive and rollback N times + for (int i = 0; i < messageCount; i++) { + consumeAndRollback(i); + } + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertNull("Should not be a DLQ message for loop: " + i, msg); + } + session.commit(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java new file mode 100644 index 0000000..a587be8 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import java.util.Enumeration; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class IndividualDeadLetterTest extends DeadLetterTest { + private static final Logger LOG = LoggerFactory.getLogger(IndividualDeadLetterTest.class); + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + strategy.setProcessNonPersistent(true); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + @Override + protected Destination createDlqDestination() { + String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue."; + return new ActiveMQQueue(prefix + getClass().getName() + "." + getName()); + } + + public void testDLQBrowsing() throws Exception { + super.topic = false; + deliveryMode = DeliveryMode.PERSISTENT; + durableSubscriber = false; + messageCount = 1; + + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + LOG.info("Will redeliver messages: " + rollbackCount + " times"); + + sendMessages(); + + // now lets receive and rollback N times + for (int i = 0; i < rollbackCount; i++) { + makeConsumer(); + Message message = consumer.receive(5000); + assertNotNull("No message received: ", message); + + session.rollback(); + LOG.info("Rolled back: " + rollbackCount + " times"); + consumer.close(); + } + + makeDlqBrowser(); + browseDlq(); + dlqBrowser.close(); + session.close(); + Thread.sleep(1000); + session = connection.createSession(transactedMode, acknowledgeMode); + Queue testQueue = new ActiveMQQueue("ActiveMQ.DLQ.Queue.ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName()); + MessageConsumer testConsumer = session.createConsumer(testQueue); + assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000)); + + } + + protected void browseDlq() throws Exception { + Enumeration messages = dlqBrowser.getEnumeration(); + while (messages.hasMoreElements()) { + LOG.info("Browsing: " + messages.nextElement()); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java new file mode 100644 index 0000000..6dd87c2 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.Destination; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; + +/** + * + * + */ +public class IndividualDeadLetterViaXmlTest extends DeadLetterTest { + private static final Logger LOG = LoggerFactory.getLogger(IndividualDeadLetterViaXmlTest.class); + + + protected BrokerService createBroker() throws Exception { + BrokerFactoryBean factory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/policy/individual-dlq.xml")); + factory.afterPropertiesSet(); + BrokerService answer = factory.getBroker(); + return answer; + } + + protected Destination createDlqDestination() { + String queueName = "Test.DLQ." + getClass().getName() + "." + getName(); + LOG.info("Using queue name: " + queueName); + return new ActiveMQQueue(queueName); + } +}