Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5D13A183AF for ; Tue, 9 Feb 2016 20:20:23 +0000 (UTC) Received: (qmail 61364 invoked by uid 500); 9 Feb 2016 20:20:22 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 61281 invoked by uid 500); 9 Feb 2016 20:20:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 60354 invoked by uid 99); 9 Feb 2016 20:20:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Feb 2016 20:20:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48D4EE6992; Tue, 9 Feb 2016 20:20:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 09 Feb 2016 20:20:47 -0000 Message-Id: <616bc6109fca43318ef77411f6f46122@git.apache.org> In-Reply-To: <0c0407b4113146a39378b87533b98c15@git.apache.org> References: <0c0407b4113146a39378b87533b98c15@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/43] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java deleted file mode 100644 index e80b05c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.transport.tcp.TcpTransportServer; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.springframework.jms.support.JmsUtils; - -public class AMQ4469Test { - - private static final int maxConnections = 100; - - private final ExecutorService executor = Executors.newCachedThreadPool(); - private String connectionUri; - private BrokerService service; - private TransportConnector connector; - - @Before - public void setUp() throws Exception { - service = new BrokerService(); - service.setPersistent(false); - service.setUseJmx(false); - connector = service.addConnector("tcp://0.0.0.0:0?maximumConnections=" + maxConnections); - connectionUri = connector.getPublishableConnectString(); - service.start(); - service.waitUntilStarted(); - } - - protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(connectionUri); - } - - @Test - public void testMaxConnectionControl() throws Exception { - final ConnectionFactory cf = createConnectionFactory(); - final CountDownLatch startupLatch = new CountDownLatch(1); - for (int i = 0; i < maxConnections + 20; i++) { - executor.submit(new Runnable() { - @Override - public void run() { - Connection conn = null; - try { - startupLatch.await(); - conn = cf.createConnection(); - conn.start(); - } - catch (Exception e) { - e.printStackTrace(); - JmsUtils.closeConnection(conn); - } - } - }); - } - - TcpTransportServer transportServer = (TcpTransportServer) connector.getServer(); - // ensure the max connections is in effect - assertEquals(maxConnections, transportServer.getMaximumConnections()); - // No connections at first - assertEquals(0, connector.getConnections().size()); - // Release the latch to set up connections in parallel - startupLatch.countDown(); - TimeUnit.SECONDS.sleep(5); - - final TransportConnector connector = this.connector; - - // Expect the max connections is created - assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return connector.getConnections().size() == maxConnections; - } - })); - } - - @After - public void tearDown() throws Exception { - executor.shutdown(); - - service.stop(); - service.waitUntilStopped(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java deleted file mode 100644 index b7ae444..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -public class AMQ4472Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4472Test.class); - - @Test - public void testLostMessage() { - Connection connection = null; - try { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false"); - connection = connectionFactory.createConnection(); - connection.start(); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination test_data_destination = session.createQueue("test" + System.currentTimeMillis()); - - MessageConsumer consumer = session.createConsumer(test_data_destination); - LOG.info("Consumer 1 connected"); - - MessageProducer producer = session.createProducer(test_data_destination); - producer.send(session.createTextMessage("Message 1")); - - // committing the session prior to the close - session.commit(); - - // starting a new transaction - producer.send(session.createTextMessage("Message 2")); - - // in a new transaction, with prefetch>0, the message - // 1 will be pending till second commit - LOG.info("Closing consumer 1..."); - consumer.close(); - - // create a consumer - consumer = session.createConsumer(test_data_destination); - LOG.info("Consumer 2 connected"); - - // retrieve message previously committed to tmp queue - Message message = consumer.receive(10000); - if (message != null) { - LOG.info("Got message 1:", message); - assertEquals("expected message", "Message 1", ((TextMessage) message).getText()); - session.commit(); - } - else { - LOG.error("Expected message but it never arrived"); - } - assertNotNull(message); - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - try { - connection.close(); - } - catch (JMSException e) { - } - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java deleted file mode 100644 index 558bc08..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java +++ /dev/null @@ -1,361 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertFalse; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.DeadLetterStrategy; -import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.util.TimeStampingBrokerPlugin; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4475Test { - - private final Log LOG = LogFactory.getLog(AMQ4475Test.class); - - private final int NUM_MSGS = 1000; - private final int MAX_THREADS = 20; - - private BrokerService broker; - private String connectionUri; - - private final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); - private final ActiveMQQueue original = new ActiveMQQueue("jms/AQueue"); - private final ActiveMQQueue rerouted = new ActiveMQQueue("jms/AQueue_proxy"); - - @Before - public void setUp() throws Exception { - TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin(); - tsbp.setZeroExpirationOverride(432000000); - tsbp.setTtlCeiling(432000000); - tsbp.setFutureOnly(true); - - broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(true); - broker.setPlugins(new BrokerPlugin[]{tsbp}); - connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); - - // Configure Dead Letter Strategy - DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); - strategy.setProcessExpired(true); - ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true); - ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ."); - strategy.setProcessNonPersistent(true); - - // Add policy and individual DLQ strategy - PolicyEntry policy = new PolicyEntry(); - policy.setTimeBeforeDispatchStarts(3000); - policy.setDeadLetterStrategy(strategy); - - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - - broker.setDestinationPolicy(pMap); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void after() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test - public void testIndividualDeadLetterAndTimeStampPlugin() { - LOG.info("Starting test .."); - - long startTime = System.nanoTime(); - - // Produce to network - List> tasks = new ArrayList<>(); - - for (int index = 0; index < 1; index++) { - ProducerTask p = new ProducerTask(connectionUri, original, NUM_MSGS); - Future future = executor.submit(p, p); - tasks.add(future); - } - - ForwardingConsumerThread f1 = new ForwardingConsumerThread(original, rerouted, NUM_MSGS); - f1.start(); - ConsumerThread c1 = new ConsumerThread(connectionUri, rerouted, NUM_MSGS); - c1.start(); - - LOG.info("Waiting on consumers and producers to exit"); - - try { - for (Future future : tasks) { - ProducerTask e = future.get(); - LOG.info("[Completed] " + e.dest.getPhysicalName()); - } - executor.shutdown(); - LOG.info("Producing threads complete, waiting on ACKs"); - f1.join(TimeUnit.MINUTES.toMillis(2)); - c1.join(TimeUnit.MINUTES.toMillis(2)); - } - catch (ExecutionException e) { - LOG.warn("Caught unexpected exception: {}", e); - throw new RuntimeException(e); - } - catch (InterruptedException ie) { - LOG.warn("Caught unexpected exception: {}", ie); - throw new RuntimeException(ie); - } - - assertFalse(f1.isFailed()); - assertFalse(c1.isFailed()); - - long estimatedTime = System.nanoTime() - startTime; - - LOG.info("Testcase duration (seconds): " + estimatedTime / 1000000000.0); - LOG.info("Consumers and producers exited, all msgs received as expected"); - } - - public class ProducerTask implements Runnable { - - private final String uri; - private final ActiveMQQueue dest; - private final int count; - - public ProducerTask(String uri, ActiveMQQueue dest, int count) { - this.uri = uri; - this.dest = dest; - this.count = count; - } - - @Override - public void run() { - - Connection connection = null; - try { - String destName = ""; - - try { - destName = dest.getQueueName(); - } - catch (JMSException e) { - LOG.warn("Caught unexpected exception: {}", e); - } - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri); - - connection = connectionFactory.createConnection(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(dest); - connection.start(); - - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - String msg = "Test Message"; - - for (int i = 0; i < count; i++) { - producer.send(session.createTextMessage(msg + dest.getQueueName() + " " + i)); - } - - LOG.info("[" + destName + "] Sent " + count + " msgs"); - } - catch (Exception e) { - LOG.warn("Caught unexpected exception: {}", e); - } - finally { - try { - connection.close(); - } - catch (Throwable e) { - LOG.warn("Caught unexpected exception: {}", e); - } - } - } - } - - public class ForwardingConsumerThread extends Thread { - - private final ActiveMQQueue original; - private final ActiveMQQueue forward; - private int blockSize = 0; - private final int PARALLEL = 1; - private boolean failed; - - public ForwardingConsumerThread(ActiveMQQueue original, ActiveMQQueue forward, int total) { - this.original = original; - this.forward = forward; - this.blockSize = total / PARALLEL; - } - - public boolean isFailed() { - return failed; - } - - @Override - public void run() { - Connection connection = null; - try { - - for (int index = 0; index < PARALLEL; index++) { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - - connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(original); - MessageProducer producer = session.createProducer(forward); - connection.start(); - int count = 0; - - while (count < blockSize) { - - Message msg1 = consumer.receive(10000); - if (msg1 != null) { - if (msg1 instanceof ActiveMQTextMessage) { - if (count % 100 == 0) { - LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count); - } - - producer.send(msg1); - - count++; - } - else { - LOG.info("Skipping unknown msg type " + msg1); - } - } - else { - break; - } - } - - LOG.info("[" + original.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")"); - connection.close(); - } - } - catch (Exception e) { - LOG.warn("Caught unexpected exception: {}", e); - } - finally { - LOG.debug(getName() + ": is stopping"); - try { - connection.close(); - } - catch (Throwable e) { - } - } - } - } - - public class ConsumerThread extends Thread { - - private final String uri; - private final ActiveMQQueue dest; - private int blockSize = 0; - private final int PARALLEL = 1; - private boolean failed; - - public ConsumerThread(String uri, ActiveMQQueue dest, int total) { - this.uri = uri; - this.dest = dest; - this.blockSize = total / PARALLEL; - } - - public boolean isFailed() { - return failed; - } - - @Override - public void run() { - Connection connection = null; - try { - - for (int index = 0; index < PARALLEL; index++) { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); - - connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(dest); - connection.start(); - int count = 0; - - while (count < blockSize) { - - Object msg1 = consumer.receive(10000); - if (msg1 != null) { - if (msg1 instanceof ActiveMQTextMessage) { - if (count % 100 == 0) { - LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count); - } - - count++; - } - else { - LOG.info("Skipping unknown msg type " + msg1); - } - } - else { - failed = true; - break; - } - } - - LOG.info("[" + dest.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")"); - connection.close(); - } - } - catch (Exception e) { - LOG.warn("Caught unexpected exception: {}", e); - } - finally { - LOG.debug(getName() + ": is stopping"); - try { - connection.close(); - } - catch (Throwable e) { - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java deleted file mode 100644 index efaf484..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.leveldb.LevelDBStore; - -public class AMQ4485LowLimitLevelDBTest extends AMQ4485LowLimitTest { - - public AMQ4485LowLimitLevelDBTest() { - super(); - numBrokers = 2; - } - - @Override - protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception { - BrokerService broker = super.createBroker(brokerid, addToNetwork); - - LevelDBStore levelDBStore = new LevelDBStore(); - levelDBStore.setDirectory(new File(broker.getBrokerDataDirectory(), "levelDB")); - broker.setPersistenceAdapter(levelDBStore); - return broker; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java deleted file mode 100644 index 4c48c2c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java +++ /dev/null @@ -1,473 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.QueueConnection; -import javax.jms.QueueReceiver; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.network.DiscoveryNetworkConnector; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.TimeUtils; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport { - - static final String payload = new String(new byte[10 * 1024]); - private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class); - final int portBase = 61600; - int numBrokers = 8; - final int numProducers = 30; - final int numMessages = 1000; - final int consumerSleepTime = 40; - StringBuilder brokersUrl = new StringBuilder(); - HashMap accumulators = new HashMap<>(); - private ArrayList exceptions = new ArrayList<>(); - - protected void buildUrlList() throws Exception { - for (int i = 0; i < numBrokers; i++) { - brokersUrl.append("tcp://localhost:" + (portBase + i)); - if (i != numBrokers - 1) { - brokersUrl.append(','); - } - } - } - - protected BrokerService createBroker(int brokerid) throws Exception { - return createBroker(brokerid, true); - } - - protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception { - - BrokerService broker = new BrokerService(); - broker.setPersistent(true); - broker.setDeleteAllMessagesOnStartup(true); - broker.getManagementContext().setCreateConnector(false); - - broker.setUseJmx(true); - broker.setBrokerName("B" + brokerid); - broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid))); - - if (addToNetwork) { - addNetworkConnector(broker); - } - broker.setSchedulePeriodForDestinationPurge(0); - broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024L); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policyEntry = new PolicyEntry(); - policyEntry.setExpireMessagesPeriod(0); - policyEntry.setQueuePrefetch(1000); - policyEntry.setMemoryLimit(2 * 1024 * 1024L); - policyEntry.setProducerFlowControl(false); - policyEntry.setEnableAudit(true); - policyEntry.setUseCache(true); - policyMap.put(new ActiveMQQueue("GW.>"), policyEntry); - - PolicyEntry inPolicyEntry = new PolicyEntry(); - inPolicyEntry.setExpireMessagesPeriod(0); - inPolicyEntry.setQueuePrefetch(1000); - inPolicyEntry.setMemoryLimit(5 * 1024 * 1024L); - inPolicyEntry.setProducerFlowControl(true); - inPolicyEntry.setEnableAudit(true); - inPolicyEntry.setUseCache(true); - policyMap.put(new ActiveMQQueue("IN"), inPolicyEntry); - - broker.setDestinationPolicy(policyMap); - - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); - - brokers.put(broker.getBrokerName(), new BrokerItem(broker)); - return broker; - } - - private void addNetworkConnector(BrokerService broker) throws Exception { - StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString()); - networkConnectorUrl.append(')'); - - for (int i = 0; i < 2; i++) { - NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString())); - nc.setName("Bridge-" + i); - nc.setNetworkTTL(1); - nc.setDecreaseNetworkConsumerPriority(true); - nc.setDynamicOnly(true); - nc.setPrefetchSize(100); - nc.setDynamicallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")})); - broker.addNetworkConnector(nc); - } - } - - // used to explore contention with concurrentStoreandDispatch - sync commit and task queue reversing - // order of cursor add and sequence assignment - public void x_testInterleavedSend() throws Exception { - - BrokerService b = createBroker(0, false); - b.start(); - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + 0)); - connectionFactory.setWatchTopicAdvisories(false); - - QueueConnection c1 = connectionFactory.createQueueConnection(); - QueueConnection c2 = connectionFactory.createQueueConnection(); - QueueConnection c3 = connectionFactory.createQueueConnection(); - - c1.start(); - c2.start(); - c3.start(); - - ActiveMQQueue dest = new ActiveMQQueue("IN"); - final Session s1 = c1.createQueueSession(true, Session.SESSION_TRANSACTED); - final TextMessage txMessage = s1.createTextMessage("TX"); - final TextMessage noTxMessage = s1.createTextMessage("NO_TX"); - - final MessageProducer txProducer = s1.createProducer(dest); - final MessageProducer nonTxProducer = c2.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(dest); - - txProducer.send(txMessage); - - ExecutorService executorService = Executors.newFixedThreadPool(2); - executorService.execute(new Runnable() { - @Override - public void run() { - try { - s1.commit(); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - }); - - executorService.execute(new Runnable() { - @Override - public void run() { - try { - nonTxProducer.send(noTxMessage); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - }); - - executorService.shutdown(); - executorService.awaitTermination(10, TimeUnit.MINUTES); - - } - - public void testBrokers() throws Exception { - - buildUrlList(); - - for (int i = 0; i < numBrokers; i++) { - createBroker(i); - } - - startAllBrokers(); - waitForBridgeFormation(numBrokers - 1); - - verifyPeerBrokerInfos(numBrokers - 1); - - final List consumerStates = startAllGWConsumers(numBrokers); - - startAllGWFanoutConsumers(numBrokers); - - LOG.info("Waiting for percolation of consumers.."); - TimeUnit.SECONDS.sleep(5); - - LOG.info("Produce mesages.."); - long startTime = System.currentTimeMillis(); - - // produce - produce(numMessages); - - assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - for (ConsumerState tally : consumerStates) { - final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1); - LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get()); - if (tally.accumulator.get() != expected) { - LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected); - if (tally.accumulator.get() > expected - 50) { - dumpQueueStat(null); - } - if (tally.expected.size() == 1) { - startConsumer(tally.brokerName, tally.destination); - } - return false; - } - LOG.info("got tally on " + tally.brokerName); - } - return true; - } - }, 1000 * 60 * 1000L, 20 * 1000)); - - assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); - - LOG.info("done"); - long duration = System.currentTimeMillis() - startTime; - LOG.info("Duration:" + TimeUtils.printDuration(duration)); - - assertEquals("nothing in the dlq's", 0, dumpQueueStat(new ActiveMQQueue("ActiveMQ.DLQ"))); - - } - - private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception { - int id = Integer.parseInt(brokerName.substring(1)); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + id)); - connectionFactory.setWatchTopicAdvisories(false); - QueueConnection queueConnection = connectionFactory.createQueueConnection(); - queueConnection.start(); - - queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination); - queueConnection.close(); - } - - private long dumpQueueStat(ActiveMQDestination destination) throws Exception { - long sumTotal = 0; - Collection brokerList = brokers.values(); - for (Iterator i = brokerList.iterator(); i.hasNext(); ) { - BrokerService brokerService = i.next().broker; - for (ObjectName objectName : brokerService.getAdminView().getQueues()) { - if (destination != null && objectName.toString().contains(destination.getPhysicalName())) { - QueueViewMBean qViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false); - LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + ", Enqueue:" + qViewMBean.getEnqueueCount() + ", Size: " + qViewMBean.getQueueSize()); - sumTotal += qViewMBean.getQueueSize(); - } - } - } - return sumTotal; - } - - private void startAllGWFanoutConsumers(int nBrokers) throws Exception { - - StringBuffer compositeDest = new StringBuffer(); - for (int k = 0; k < nBrokers; k++) { - compositeDest.append("GW." + k); - if (k + 1 != nBrokers) { - compositeDest.append(','); - } - } - ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString()); - - for (int id = 0; id < nBrokers; id++) { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); - connectionFactory.setWatchTopicAdvisories(false); - - QueueConnection queueConnection = connectionFactory.createQueueConnection(); - queueConnection.start(); - - final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED); - - final MessageProducer producer = queueSession.createProducer(compositeQ); - queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - producer.send(message); - queueSession.commit(); - } - catch (Exception e) { - LOG.error("Failed to fanout to GW: " + message, e); - } - - } - }); - } - } - - private List startAllGWConsumers(int nBrokers) throws Exception { - List consumerStates = new LinkedList<>(); - for (int id = 0; id < nBrokers; id++) { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); - connectionFactory.setWatchTopicAdvisories(false); - - QueueConnection queueConnection = connectionFactory.createQueueConnection(); - queueConnection.start(); - - final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQQueue destination = new ActiveMQQueue("GW." + id); - QueueReceiver queueReceiver = queueSession.createReceiver(destination); - - final ConsumerState consumerState = new ConsumerState(); - consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName(); - consumerState.receiver = queueReceiver; - consumerState.destination = destination; - for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) { - consumerState.expected.add(j); - } - - if (!accumulators.containsKey(destination)) { - accumulators.put(destination, new AtomicInteger(0)); - } - consumerState.accumulator = accumulators.get(destination); - - queueReceiver.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - if (consumerSleepTime > 0) { - TimeUnit.MILLISECONDS.sleep(consumerSleepTime); - } - } - catch (InterruptedException e) { - e.printStackTrace(); - } - try { - consumerState.accumulator.incrementAndGet(); - try { - consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM")); - } - catch (IOException e) { - e.printStackTrace(); - } - //queueSession.commit(); - } - catch (Exception e) { - LOG.error("Failed to commit slow receipt of " + message, e); - } - } - }); - - consumerStates.add(consumerState); - - } - return consumerStates; - } - - private void produce(final int numMessages) throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(numProducers); - final AtomicInteger toSend = new AtomicInteger(numMessages); - for (int i = 1; i <= numProducers; i++) { - final int id = i % numBrokers; - executorService.execute(new Runnable() { - @Override - public void run() { - try { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); - connectionFactory.setWatchTopicAdvisories(false); - QueueConnection queueConnection = connectionFactory.createQueueConnection(); - queueConnection.start(); - QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = queueSession.createProducer(null); - int val = 0; - while ((val = toSend.decrementAndGet()) >= 0) { - - int id = numMessages - val - 1; - - ActiveMQQueue compositeQ = new ActiveMQQueue("IN"); - Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload); - textMessage.setIntProperty("NUM", id); - producer.send(compositeQ, textMessage); - } - queueConnection.close(); - - } - catch (Throwable throwable) { - throwable.printStackTrace(); - exceptions.add(throwable); - } - } - }); - } - } - - private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception { - final BrokerService broker = brokerItem.broker; - final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); - return max == regionBroker.getPeerBrokerInfos().length; - } - }); - LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); - List missing = new ArrayList<>(); - for (int i = 0; i < max; i++) { - missing.add("B" + i); - } - if (max != regionBroker.getPeerBrokerInfos().length) { - for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { - LOG.info(info.getBrokerName()); - missing.remove(info.getBrokerName()); - } - LOG.info("Broker infos off.." + missing); - } - assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); - } - - private void verifyPeerBrokerInfos(final int max) throws Exception { - Collection brokerList = brokers.values(); - for (Iterator i = brokerList.iterator(); i.hasNext(); ) { - verifyPeerBrokerInfo(i.next(), max); - } - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - class ConsumerState { - - AtomicInteger accumulator; - String brokerName; - QueueReceiver receiver; - ActiveMQDestination destination; - ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java deleted file mode 100644 index 5ddb14f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java +++ /dev/null @@ -1,358 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.QueueConnection; -import javax.jms.QueueReceiver; -import javax.jms.QueueSession; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.network.DiscoveryNetworkConnector; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.TimeUtils; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport { - - static final String payload = new String(new byte[10 * 1024]); - private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class); - final int portBase = 61600; - final int numBrokers = 4; - final int numProducers = 10; - final int numMessages = 800; - final int consumerSleepTime = 20; - StringBuilder brokersUrl = new StringBuilder(); - HashMap accumulators = new HashMap<>(); - private ArrayList exceptions = new ArrayList<>(); - - protected void buildUrlList() throws Exception { - for (int i = 0; i < numBrokers; i++) { - brokersUrl.append("tcp://localhost:" + (portBase + i)); - if (i != numBrokers - 1) { - brokersUrl.append(','); - } - } - } - - protected BrokerService createBroker(int brokerid) throws Exception { - BrokerService broker = new BrokerService(); - broker.setPersistent(true); - broker.setDeleteAllMessagesOnStartup(true); - broker.getManagementContext().setCreateConnector(false); - - broker.setUseJmx(true); - broker.setBrokerName("B" + brokerid); - broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid))); - - addNetworkConnector(broker); - broker.setSchedulePeriodForDestinationPurge(0); - broker.getSystemUsage().setSendFailIfNoSpace(true); - broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policyEntry = new PolicyEntry(); - policyEntry.setExpireMessagesPeriod(0); - policyEntry.setQueuePrefetch(1000); - policyEntry.setMemoryLimit(1024 * 1024L); - policyEntry.setOptimizedDispatch(false); - policyEntry.setProducerFlowControl(false); - policyEntry.setEnableAudit(true); - policyEntry.setUseCache(true); - policyMap.put(new ActiveMQQueue("GW.>"), policyEntry); - broker.setDestinationPolicy(policyMap); - - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false); - - brokers.put(broker.getBrokerName(), new BrokerItem(broker)); - return broker; - } - - private void addNetworkConnector(BrokerService broker) throws Exception { - StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString()); - networkConnectorUrl.append(')'); - - for (int i = 0; i < 2; i++) { - NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString())); - nc.setName("Bridge-" + i); - nc.setNetworkTTL(1); - nc.setDecreaseNetworkConsumerPriority(true); - nc.setDynamicOnly(true); - nc.setPrefetchSize(100); - nc.setDynamicallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")})); - broker.addNetworkConnector(nc); - } - } - - public void testBrokers() throws Exception { - - buildUrlList(); - - for (int i = 0; i < numBrokers; i++) { - createBroker(i); - } - - startAllBrokers(); - waitForBridgeFormation(numBrokers - 1); - - verifyPeerBrokerInfos(numBrokers - 1); - - final List consumerStates = startAllGWConsumers(numBrokers); - - startAllGWFanoutConsumers(numBrokers); - - LOG.info("Waiting for percolation of consumers.."); - TimeUnit.SECONDS.sleep(5); - - LOG.info("Produce mesages.."); - long startTime = System.currentTimeMillis(); - - // produce - produce(numMessages); - - assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - for (ConsumerState tally : consumerStates) { - final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1); - LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get()); - if (tally.accumulator.get() != expected) { - LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected); - return false; - } - LOG.info("got tally on " + tally.brokerName); - } - return true; - } - }, 1000 * 60 * 1000L)); - - assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); - - LOG.info("done"); - long duration = System.currentTimeMillis() - startTime; - LOG.info("Duration:" + TimeUtils.printDuration(duration)); - } - - private void startAllGWFanoutConsumers(int nBrokers) throws Exception { - - StringBuffer compositeDest = new StringBuffer(); - for (int k = 0; k < nBrokers; k++) { - compositeDest.append("GW." + k); - if (k + 1 != nBrokers) { - compositeDest.append(','); - } - } - ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString()); - - for (int id = 0; id < nBrokers; id++) { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); - connectionFactory.setWatchTopicAdvisories(false); - - QueueConnection queueConnection = connectionFactory.createQueueConnection(); - queueConnection.start(); - - final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED); - - final MessageProducer producer = queueSession.createProducer(compositeQ); - queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - producer.send(message); - queueSession.commit(); - } - catch (Exception e) { - LOG.error("Failed to fanout to GW: " + message, e); - } - - } - }); - } - } - - private List startAllGWConsumers(int nBrokers) throws Exception { - List consumerStates = new LinkedList<>(); - for (int id = 0; id < nBrokers; id++) { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); - connectionFactory.setWatchTopicAdvisories(false); - - QueueConnection queueConnection = connectionFactory.createQueueConnection(); - queueConnection.start(); - - final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQQueue destination = new ActiveMQQueue("GW." + id); - QueueReceiver queueReceiver = queueSession.createReceiver(destination); - - final ConsumerState consumerState = new ConsumerState(); - consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName(); - consumerState.receiver = queueReceiver; - consumerState.destination = destination; - for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) { - consumerState.expected.add(j); - } - - if (!accumulators.containsKey(destination)) { - accumulators.put(destination, new AtomicInteger(0)); - } - consumerState.accumulator = accumulators.get(destination); - - queueReceiver.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - if (consumerSleepTime > 0) { - TimeUnit.MILLISECONDS.sleep(consumerSleepTime); - } - } - catch (InterruptedException e) { - e.printStackTrace(); - } - try { - consumerState.accumulator.incrementAndGet(); - try { - consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM")); - } - catch (IOException e) { - e.printStackTrace(); - } - } - catch (Exception e) { - LOG.error("Failed to commit slow receipt of " + message, e); - } - } - }); - - consumerStates.add(consumerState); - - } - return consumerStates; - } - - private void produce(int numMessages) throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(numProducers); - final AtomicInteger toSend = new AtomicInteger(numMessages); - for (int i = 1; i <= numProducers; i++) { - final int id = i % numBrokers; - executorService.execute(new Runnable() { - @Override - public void run() { - try { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); - connectionFactory.setWatchTopicAdvisories(false); - QueueConnection queueConnection = connectionFactory.createQueueConnection(); - queueConnection.start(); - QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = queueSession.createProducer(null); - int val = 0; - while ((val = toSend.decrementAndGet()) >= 0) { - - ActiveMQQueue compositeQ = new ActiveMQQueue("IN"); - LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ); - Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + val + " payload:" + payload); - textMessage.setIntProperty("NUM", val); - producer.send(compositeQ, textMessage); - } - queueConnection.close(); - - } - catch (Throwable throwable) { - throwable.printStackTrace(); - exceptions.add(throwable); - } - } - }); - } - } - - private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception { - final BrokerService broker = brokerItem.broker; - final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); - return max == regionBroker.getPeerBrokerInfos().length; - } - }); - LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); - List missing = new ArrayList<>(); - for (int i = 0; i < max; i++) { - missing.add("B" + i); - } - if (max != regionBroker.getPeerBrokerInfos().length) { - for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { - LOG.info(info.getBrokerName()); - missing.remove(info.getBrokerName()); - } - LOG.info("Broker infos off.." + missing); - } - assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); - } - - private void verifyPeerBrokerInfos(final int max) throws Exception { - Collection brokerList = brokers.values(); - for (Iterator i = brokerList.iterator(); i.hasNext(); ) { - verifyPeerBrokerInfo(i.next(), max); - } - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - class ConsumerState { - - AtomicInteger accumulator; - String brokerName; - QueueReceiver receiver; - ActiveMQDestination destination; - Vector expected = new Vector<>(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java deleted file mode 100644 index 777d582..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.HashSet; -import java.util.Set; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.TransactionBroker; -import org.apache.activemq.broker.jmx.DestinationViewMBean; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4485Test extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4485Test.class); - BrokerService broker; - ActiveMQConnectionFactory factory; - final int messageCount = 20; - int memoryLimit = 40 * 1024; - final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + this.getClass().getName()); - final Vector exceptions = new Vector<>(); - final CountDownLatch slowSendResume = new CountDownLatch(1); - - protected void configureBroker(long memoryLimit) throws Exception { - broker.setDeleteAllMessagesOnStartup(true); - broker.setAdvisorySupport(false); - - PolicyEntry policy = new PolicyEntry(); - policy.setExpireMessagesPeriod(0); - policy.setMemoryLimit(memoryLimit); - policy.setProducerFlowControl(false); - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - broker.setDestinationPolicy(pMap); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - @Override - public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception { - if (messageSend.isInTransaction() && messageSend.getProperty("NUM") != null) { - final Integer num = (Integer) messageSend.getProperty("NUM"); - if (true) { - TransactionBroker transactionBroker = (TransactionBroker) broker.getBroker().getAdaptor(TransactionBroker.class); - transactionBroker.getTransaction(producerExchange.getConnectionContext(), messageSend.getTransactionId(), false).addSynchronization(new Synchronization() { - @Override - public void afterCommit() throws Exception { - LOG.error("AfterCommit, NUM:" + num + ", " + messageSend.getMessageId() + ", tx: " + messageSend.getTransactionId()); - if (num == 5) { - // we want to add to cursor after usage is exhausted by message 20 and when - // all other messages have been processed - LOG.error("Pausing on latch in afterCommit for: " + num + ", " + messageSend.getMessageId()); - slowSendResume.await(20, TimeUnit.SECONDS); - LOG.error("resuming on latch afterCommit for: " + num + ", " + messageSend.getMessageId()); - } - else if (messageCount + 1 == num) { - LOG.error("releasing latch. " + num + ", " + messageSend.getMessageId()); - slowSendResume.countDown(); - // for message X, we need to delay so message 5 can setBatch - TimeUnit.SECONDS.sleep(5); - LOG.error("resuming afterCommit for: " + num + ", " + messageSend.getMessageId()); - } - } - }); - } - } - super.send(producerExchange, messageSend); - } - }}); - - } - - public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws Exception { - - Set expected = new HashSet<>(); - final Vector sessionVector = new Vector<>(); - ExecutorService executorService = Executors.newCachedThreadPool(); - for (int i = 1; i <= messageCount; i++) { - sessionVector.add(send(i, 1, true)); - expected.add(i); - } - - // get parallel commit so that the sync writes are batched - for (int i = 0; i < messageCount; i++) { - final int id = i; - executorService.submit(new Runnable() { - @Override - public void run() { - try { - sessionVector.get(id).commit(); - } - catch (Exception fail) { - exceptions.add(fail); - } - } - }); - } - - final DestinationViewMBean queueViewMBean = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], DestinationViewMBean.class, false); - - // not sure how many messages will get enqueued - TimeUnit.SECONDS.sleep(3); - if (false) - assertTrue("all " + messageCount + " on the q", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount()); - return messageCount == queueViewMBean.getEnqueueCount(); - } - })); - - LOG.info("Big send to blow available destination usage before slow send resumes"); - send(messageCount + 1, 35 * 1024, true).commit(); - - // consume and verify all received - Connection cosumerConnection = factory.createConnection(); - cosumerConnection.start(); - MessageConsumer consumer = cosumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination); - for (int i = 1; i <= messageCount + 1; i++) { - BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000); - assertNotNull("Got message: " + i + ", " + expected, bytesMessage); - MessageId mqMessageId = ((ActiveMQBytesMessage) bytesMessage).getMessageId(); - LOG.info("got: " + expected + ", " + mqMessageId + ", NUM=" + ((ActiveMQBytesMessage) bytesMessage).getProperty("NUM")); - expected.remove(((ActiveMQBytesMessage) bytesMessage).getProperty("NUM")); - } - } - - private Session send(int id, int messageSize, boolean transacted) throws Exception { - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - BytesMessage bytesMessage = session.createBytesMessage(); - bytesMessage.writeBytes(new byte[messageSize]); - bytesMessage.setIntProperty("NUM", id); - producer.send(bytesMessage); - LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " + ((ActiveMQBytesMessage) bytesMessage).getTransactionId()); - return session; - } - - @Override - protected void setUp() throws Exception { - super.setUp(); - broker = new BrokerService(); - broker.setBrokerName("thisOne"); - configureBroker(memoryLimit); - broker.start(); - factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true"); - factory.setWatchTopicAdvisories(false); - - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - if (broker != null) { - broker.stop(); - broker = null; - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java deleted file mode 100644 index 7d3ee41..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.Enumeration; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4487Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4487Test.class); - - private final String destinationName = "TEST.QUEUE"; - private BrokerService broker; - private ActiveMQConnectionFactory factory; - - @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.deleteAllMessages(); - broker.setUseJmx(false); - broker.setAdvisorySupport(false); - - PolicyEntry policy = new PolicyEntry(); - policy.setQueue(">"); - policy.setMaxProducersToAudit(75); - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - broker.setDestinationPolicy(pMap); - - broker.start(); - broker.waitUntilStarted(); - factory = new ActiveMQConnectionFactory("vm://localhost"); - } - - @After - public void stopBroker() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - private void sendMessages(int messageToSend) throws Exception { - String data = ""; - for (int i = 0; i < 1024 * 2; i++) { - data += "x"; - } - - Connection connection = factory.createConnection(); - connection.start(); - - for (int i = 0; i < messageToSend; i++) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(destinationName); - MessageProducer producer = session.createProducer(queue); - producer.send(session.createTextMessage(data)); - session.close(); - } - - connection.close(); - } - - @Test - public void testBrowsingWithLessThanMaxAuditDepth() throws Exception { - doTestBrowsing(75); - } - - @Test - public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception { - doTestBrowsing(300); - } - - @SuppressWarnings("rawtypes") - private void doTestBrowsing(int messagesToSend) throws Exception { - - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(destinationName); - - sendMessages(messagesToSend); - - QueueBrowser browser = session.createBrowser(queue); - Enumeration enumeration = browser.getEnumeration(); - int received = 0; - while (enumeration.hasMoreElements()) { - Message m = (Message) enumeration.nextElement(); - assertNotNull(m); - - if (LOG.isDebugEnabled()) { - LOG.debug("Browsed Message: {}", m.getJMSMessageID()); - } - - received++; - if (received > messagesToSend) { - break; - } - } - - browser.close(); - - assertEquals(messagesToSend, received); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java deleted file mode 100644 index a89aca2..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; - -public class AMQ4504Test { - - BrokerService brokerService; - - @Before - public void setup() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.start(); - } - - @After - public void stop() throws Exception { - brokerService.stop(); - } - - @Test - public void testCompositeDestConsumer() throws Exception { - - final int numDests = 20; - final int numMessages = 200; - StringBuffer stringBuffer = new StringBuffer(); - for (int i = 0; i < numDests; i++) { - if (stringBuffer.length() != 0) { - stringBuffer.append(','); - } - stringBuffer.append("ST." + i); - } - stringBuffer.append("?consumer.prefetchSize=100"); - ActiveMQQueue activeMQQueue = new ActiveMQQueue(stringBuffer.toString()); - ConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); - Connection connection = factory.createConnection(); - connection.start(); - MessageProducer producer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(activeMQQueue); - for (int i = 0; i < numMessages; i++) { - producer.send(new ActiveMQTextMessage()); - } - - MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(activeMQQueue); - try { - for (int i = 0; i < numMessages * numDests; i++) { - assertNotNull("received:" + i, consumer.receive(4000)); - } - } - finally { - connection.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6a3a994e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java deleted file mode 100644 index ceac82f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertTrue; - -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.DeadLetterStrategy; -import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4513Test { - - private BrokerService brokerService; - private String connectionUri; - - @Before - public void setup() throws Exception { - brokerService = new BrokerService(); - - connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); - - // Configure Dead Letter Strategy - DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); - ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true); - ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ."); - strategy.setProcessNonPersistent(false); - strategy.setProcessExpired(false); - - // Add policy and individual DLQ strategy - PolicyEntry policy = new PolicyEntry(); - policy.setTimeBeforeDispatchStarts(3000); - policy.setDeadLetterStrategy(strategy); - - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - - brokerService.setDestinationPolicy(pMap); - - brokerService.setPersistent(false); - brokerService.start(); - } - - @After - public void stop() throws Exception { - brokerService.stop(); - } - - @Test(timeout = 360000) - public void test() throws Exception { - - final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); - - ExecutorService service = Executors.newFixedThreadPool(25); - - final Random ripple = new Random(System.currentTimeMillis()); - - for (int i = 0; i < 1000; ++i) { - service.execute(new Runnable() { - @Override - public void run() { - try { - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTemporaryQueue(); - session.createProducer(destination); - connection.close(); - TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20)); - } - catch (Exception e) { - } - } - }); - - service.execute(new Runnable() { - @Override - public void run() { - try { - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTemporaryQueue(); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.setTimeToLive(400); - producer.send(session.createTextMessage()); - producer.send(session.createTextMessage()); - TimeUnit.MILLISECONDS.sleep(500); - connection.close(); - } - catch (Exception e) { - } - } - }); - - service.execute(new Runnable() { - @Override - public void run() { - try { - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTemporaryQueue(); - session.createProducer(destination); - connection.close(); - TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20)); - } - catch (Exception e) { - } - } - }); - } - - service.shutdown(); - assertTrue(service.awaitTermination(5, TimeUnit.MINUTES)); - } -}