Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D5009186D5 for ; Thu, 4 Feb 2016 22:05:08 +0000 (UTC) Received: (qmail 52014 invoked by uid 500); 4 Feb 2016 22:05:04 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 51930 invoked by uid 500); 4 Feb 2016 22:05:04 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 50993 invoked by uid 99); 4 Feb 2016 22:05:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2016 22:05:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 30200E38C8; Thu, 4 Feb 2016 22:05:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 04 Feb 2016 22:05:39 -0000 Message-Id: <73e836e2872a4756a73edf9c5cbabedb@git.apache.org> In-Reply-To: <8ce01202e2ee4c9eb15eafd06d63a1bb@git.apache.org> References: <8ce01202e2ee4c9eb15eafd06d63a1bb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [37/44] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java deleted file mode 100644 index ea794ff..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import javax.jms.*; -import javax.jms.Queue; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class AMQ2171Test implements Thread.UncaughtExceptionHandler { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2171Test.class); - private static final String BROKER_URL = "tcp://localhost:0"; - private static final int QUEUE_SIZE = 100; - - private static BrokerService brokerService; - private static Queue destination; - - private String brokerUri; - private String brokerUriNoPrefetch; - private Collection exceptions = new CopyOnWriteArrayList<>(); - - @Before - public void setUp() throws Exception { - // Start an embedded broker up. - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.addConnector(BROKER_URL); - brokerService.start(); - - brokerUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString().toString(); - brokerUriNoPrefetch = brokerUri + "?jms.prefetchPolicy.all=0"; - - destination = new ActiveMQQueue("Test"); - produce(brokerUri, QUEUE_SIZE); - } - - @Before - public void addHandler() { - Thread.setDefaultUncaughtExceptionHandler(this); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - } - - @Test(timeout = 10000) - public void testBrowsePrefetch() throws Exception { - runTest(brokerUri); - } - - @Test(timeout = 10000) - public void testBrowseNoPrefetch() throws Exception { - runTest(brokerUriNoPrefetch); - } - - private void runTest(String brokerURL) throws Exception { - - Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); - - try { - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Enumeration unread = session.createBrowser(destination).getEnumeration(); - - int count = 0; - while (unread.hasMoreElements()) { - unread.nextElement(); - count++; - } - - assertEquals(QUEUE_SIZE, count); - assertTrue(exceptions.isEmpty()); - } - finally { - try { - connection.close(); - } - catch (JMSException e) { - exceptions.add(e); - } - } - } - - private static void produce(String brokerURL, int count) throws Exception { - Connection connection = null; - - try { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); - connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setTimeToLive(0); - connection.start(); - - for (int i = 0; i < count; i++) { - int id = i + 1; - TextMessage message = session.createTextMessage("Message " + id); - message.setIntProperty("MsgNumber", id); - producer.send(message); - - if (id % 500 == 0) { - LOG.info("sent " + id + ", ith " + message); - } - } - } - finally { - try { - if (connection != null) { - connection.close(); - } - } - catch (Throwable e) { - } - } - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - exceptions.add(e); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java deleted file mode 100644 index d6b4aaa..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import java.io.File; -import java.util.concurrent.TimeUnit; - -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.TopicSubscriptionViewMBean; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ2200Test { - - private static final String bindAddress = "tcp://0.0.0.0:0"; - private BrokerService broker; - private ActiveMQConnectionFactory cf; - - @Before - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.addConnector(bindAddress); - String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); - broker.start(); - broker.waitUntilStarted(); - - cf = new ActiveMQConnectionFactory(address); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test - public void testTopicSubscriptionView() throws Exception { - TopicConnection connection = cf.createTopicConnection(); - TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - - Topic destination = session.createTopic("TopicViewTestTopic"); - MessageConsumer consumer = session.createConsumer(destination); - assertNotNull(consumer); - TimeUnit.SECONDS.sleep(1); - - ObjectName subscriptionNames[] = broker.getAdminView().getTopicSubscribers(); - assertTrue(subscriptionNames.length > 0); - - boolean fail = true; - for (ObjectName name : subscriptionNames) { - if (name.toString().contains("TopicViewTestTopic")) { - TopicSubscriptionViewMBean sub = (TopicSubscriptionViewMBean) broker.getManagementContext().newProxyInstance(name, TopicSubscriptionViewMBean.class, true); - assertNotNull(sub); - assertTrue(sub.getSessionId() != -1); - // Check that its the default value then configure something new. - assertTrue(sub.getMaximumPendingQueueSize() == -1); - sub.setMaximumPendingQueueSize(1000); - assertTrue(sub.getMaximumPendingQueueSize() != -1); - fail = false; - } - } - - if (fail) { - fail("Didn't find the TopicSubscriptionView"); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java deleted file mode 100644 index 2152e12..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ2213Test { - - BrokerService broker; - ConnectionFactory factory; - Connection connection; - Session session; - Queue queue; - MessageConsumer consumer; - - public void createBroker(boolean deleteAll) throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(deleteAll); - broker.setDataDirectory("target/AMQ3145Test"); - broker.setUseJmx(true); - broker.getManagementContext().setCreateConnector(false); - broker.addConnector("tcp://localhost:0"); - broker.start(); - broker.waitUntilStarted(); - factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString()); - connection = factory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - } - - @Before - public void createBroker() throws Exception { - createBroker(true); - } - - @After - public void tearDown() throws Exception { - if (consumer != null) { - consumer.close(); - } - session.close(); - connection.stop(); - connection.close(); - broker.stop(); - } - - @Test - public void testEqualsGenericSession() throws JMSException { - assertNotNull(this.connection); - Session sess = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - assertTrue(sess.equals(sess)); - } - - @Test - public void testEqualsTopicSession() throws JMSException { - assertNotNull(this.connection); - assertTrue(this.connection instanceof TopicConnection); - TopicSession sess = ((TopicConnection) this.connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - assertTrue(sess.equals(sess)); - } - - @Test - public void testEqualsQueueSession() throws JMSException { - assertNotNull(this.connection); - assertTrue(this.connection instanceof QueueConnection); - QueueSession sess = ((QueueConnection) this.connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - assertTrue(sess.equals(sess)); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java deleted file mode 100644 index fde821f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.concurrent.CountDownLatch; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2314Test extends CombinationTestSupport { - - public boolean consumeAll = false; - public int deliveryMode = DeliveryMode.NON_PERSISTENT; - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2314Test.class); - private static final int MESSAGES_COUNT = 30000; - private static byte[] buf = new byte[1024]; - private BrokerService broker; - private String connectionUri; - - private static final long messageReceiveTimeout = 500L; - - Destination destination = new ActiveMQTopic("FooTwo"); - - public void testRemoveSlowSubscriberWhacksTempStore() throws Exception { - runProducerWithHungConsumer(); - } - - public void testMemoryUsageReleasedOnAllConsumed() throws Exception { - consumeAll = true; - runProducerWithHungConsumer(); - // do it again to ensure memory limits are decreased - runProducerWithHungConsumer(); - } - - public void runProducerWithHungConsumer() throws Exception { - - final CountDownLatch consumerContinue = new CountDownLatch(1); - final CountDownLatch consumerReady = new CountDownLatch(1); - - final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage(); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - factory.setAlwaysSyncSend(true); - - // ensure messages are spooled to disk for this consumer - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setTopicPrefetch(500); - factory.setPrefetchPolicy(prefetch); - final Connection connection = factory.createConnection(); - connection.start(); - - Thread producingThread = new Thread("Producing thread") { - @Override - public void run() { - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { - Message message = session.createTextMessage(new String(buf) + idx); - producer.send(message); - } - producer.close(); - session.close(); - } - catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - - Thread consumingThread = new Thread("Consuming thread") { - @Override - public void run() { - try { - int count = 0; - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - - while (consumer.receive(messageReceiveTimeout) == null) { - consumerReady.countDown(); - } - count++; - LOG.info("Received one... waiting"); - consumerContinue.await(); - if (consumeAll) { - LOG.info("Consuming the rest of the messages..."); - while (consumer.receive(messageReceiveTimeout) != null) { - count++; - } - } - LOG.info("consumer session closing: consumed count: " + count); - session.close(); - } - catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - consumingThread.start(); - consumerReady.await(); - - producingThread.start(); - producingThread.join(); - - final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage(); - LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription); - assertTrue("some temp store has been used", tempUsageBySubscription != origTempUsage); - consumerContinue.countDown(); - consumingThread.join(); - connection.close(); - - LOG.info("Subscription Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage()); - - assertTrue("temp usage decreased with removed sub", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker.getSystemUsage().getTempUsage().getUsage() < tempUsageBySubscription; - } - })); - } - - @Override - public void setUp() throws Exception { - super.setAutoFail(true); - super.setUp(); - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.getSystemUsage().getMemoryUsage().setLimit(1024L * 1024 * 64); - - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - @Override - public void tearDown() throws Exception { - broker.stop(); - } - - public static Test suite() { - return suite(AMQ2314Test.class); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java deleted file mode 100644 index 2f9bb84..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBStore; - -/* - AMQ2356Test - We have an environment where we have a very large number of destinations. - In an effort to reduce the number of threads I have set the options - -Dorg.apache.activemq.UseDedicatedTaskRunner=false - - and - - - - Unfortunately this very quickly leads to deadlocked queues. - - My environment is: - - ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single core on my system) - TCP transportConnector - - To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to 5 different queues. - Then I start 5 producers and pair them up with a consumer on a queue, and they start sending PERSISTENT messages. - I've set the producer to send 100 messages and disconnect, and the consumer to receive 100 messages and disconnect. - The first pair usually gets through their 100 messages and disconnect, at which point all the other pairs have - deadlocked at less than 30 messages each. - */ -public class AMQ2356Test extends TestCase { - - protected static final int MESSAGE_COUNT = 1000; - protected static final int NUMBER_OF_PAIRS = 10; - protected BrokerService broker; - protected String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; - protected int destinationCount; - - public void testScenario() throws Exception { - for (int i = 0; i < NUMBER_OF_PAIRS; i++) { - ActiveMQQueue queue = new ActiveMQQueue(getClass().getName() + ":" + i); - ProducerConsumerPair cp = new ProducerConsumerPair(); - cp.start(this.brokerURL, queue, MESSAGE_COUNT); - cp.testRun(); - cp.stop(); - } - } - - protected Destination getDestination(Session session) throws JMSException { - String destinationName = getClass().getName() + "." + destinationCount++; - return session.createQueue(destinationName); - } - - @Override - protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); - } - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - if (broker != null) { - broker.stop(); - } - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - configureBroker(answer); - answer.start(); - return answer; - } - - protected void configureBroker(BrokerService answer) throws Exception { - File dataFileDir = new File("target/test-amq-data/bugs/AMQ2356/kahadb"); - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(dataFileDir); - answer.setUseJmx(false); - // Setup a destination policy where it takes only 1 message at a time. - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policy = new PolicyEntry(); - policy.setOptimizedDispatch(true); - policyMap.setDefaultEntry(policy); - answer.setDestinationPolicy(policyMap); - - answer.setAdvisorySupport(false); - answer.setEnableStatistics(false); - answer.setDeleteAllMessagesOnStartup(true); - answer.addConnector(brokerURL); - - } - - static class ProducerConsumerPair { - - private Destination destination; - private MessageProducer producer; - private MessageConsumer consumer; - private Connection producerConnection; - private Connection consumerConnection; - private int numberOfMessages; - - ProducerConsumerPair() { - - } - - void start(String brokerURL, final Destination dest, int msgNum) throws Exception { - this.destination = dest; - this.numberOfMessages = msgNum; - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL); - this.producerConnection = cf.createConnection(); - this.producerConnection.start(); - this.consumerConnection = cf.createConnection(); - this.consumerConnection.start(); - this.producer = createProducer(this.producerConnection); - this.consumer = createConsumer(this.consumerConnection); - } - - void testRun() throws Exception { - - Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - for (int i = 0; i < this.numberOfMessages; i++) { - BytesMessage msg = s.createBytesMessage(); - msg.writeBytes(new byte[1024]); - this.producer.send(msg); - } - int received = 0; - for (int i = 0; i < this.numberOfMessages; i++) { - Message msg = this.consumer.receive(); - assertNotNull(msg); - received++; - } - assertEquals("Messages received on " + this.destination, this.numberOfMessages, received); - - } - - void stop() throws Exception { - if (this.producerConnection != null) { - this.producerConnection.close(); - } - if (this.consumerConnection != null) { - this.consumerConnection.close(); - } - } - - private MessageProducer createProducer(Connection connection) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer result = session.createProducer(this.destination); - return result; - } - - private MessageConsumer createConsumer(Connection connection) throws Exception { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer result = session.createConsumer(this.destination); - return result; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java deleted file mode 100644 index 5f79b6c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; -//package org.apache.activemq.transport.failover; - -import static org.junit.Assert.assertEquals; - -import java.lang.reflect.Field; -import java.net.URI; -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.state.ConnectionState; -import org.apache.activemq.state.ConnectionStateTracker; -import org.apache.activemq.state.TransactionState; -import org.apache.activemq.transport.MutexTransport; -import org.apache.activemq.transport.ResponseCorrelator; -import org.apache.activemq.transport.failover.FailoverTransport; -import org.junit.Test; - -public class AMQ2364Test { - - @SuppressWarnings("unchecked") - @Test - public void testRollbackLeak() throws Exception { - - int messageCount = 1000; - URI failoverUri = new URI("failover:(vm://localhost)?jms.redeliveryPolicy.maximumRedeliveries=0"); - - Destination dest = new ActiveMQQueue("Failover.Leak"); - - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri); - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - connection.start(); - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - MessageProducer producer = session.createProducer(dest); - - for (int i = 0; i < messageCount; ++i) - producer.send(session.createTextMessage("Test message #" + i)); - producer.close(); - session.commit(); - - MessageConsumer consumer = session.createConsumer(dest); - - final CountDownLatch latch = new CountDownLatch(messageCount); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message msg) { - try { - session.rollback(); - } - catch (JMSException e) { - e.printStackTrace(); - } - finally { - latch.countDown(); - } - } - }); - - latch.await(); - consumer.close(); - session.close(); - - ResponseCorrelator respCorr = (ResponseCorrelator) connection.getTransport(); - MutexTransport mutexTrans = (MutexTransport) respCorr.getNext(); - FailoverTransport failoverTrans = (FailoverTransport) mutexTrans.getNext(); - Field stateTrackerField = FailoverTransport.class.getDeclaredField("stateTracker"); - stateTrackerField.setAccessible(true); - ConnectionStateTracker stateTracker = (ConnectionStateTracker) stateTrackerField.get(failoverTrans); - Field statesField = ConnectionStateTracker.class.getDeclaredField("connectionStates"); - statesField.setAccessible(true); - ConcurrentHashMap states = (ConcurrentHashMap) statesField.get(stateTracker); - - ConnectionState state = states.get(connection.getConnectionInfo().getConnectionId()); - - Collection transactionStates = state.getTransactionStates(); - - connection.stop(); - connection.close(); - - assertEquals("Transaction states not cleaned up", 0, transactionStates.size()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java deleted file mode 100644 index f4e7908..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQDestination; -import org.junit.Test; - -public class AMQ2383Test { - - @Test - public void activeMQTest() throws Exception { - Destination dest = ActiveMQDestination.createDestination("testQueue", ActiveMQDestination.QUEUE_TYPE); - ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false"); - Connection producerConnection = factory.createConnection(); - producerConnection.start(); - Connection consumerConnection = factory.createConnection(); - consumerConnection.start(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(dest); - TextMessage sentMsg = producerSession.createTextMessage("test..."); - producer.send(sentMsg); - producerSession.close(); - - Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = consumerSession.createConsumer(dest); - TextMessage receivedMsg = (TextMessage) consumer.receive(); - consumerSession.rollback(); - consumerSession.close(); - - assertEquals(sentMsg, receivedMsg); - - producerConnection.close(); - consumerConnection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java deleted file mode 100644 index edd4e8f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An AMQ-2401 Test - */ -public class AMQ2401Test extends TestCase implements MessageListener { - - private BrokerService broker; - private ActiveMQConnectionFactory factory; - private static final int SEND_COUNT = 500; - private static final int CONSUMER_COUNT = 50; - private static final int PRODUCER_COUNT = 1; - private static final int LOG_INTERVAL = 10; - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2401Test.class); - - private final ArrayList services = new ArrayList<>(CONSUMER_COUNT + PRODUCER_COUNT); - private int count = 0; - private CountDownLatch latch; - - @Override - protected void setUp() throws Exception { - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test"); - broker.setDeleteAllMessagesOnStartup(true); - String connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString(); - PolicyMap policies = new PolicyMap(); - PolicyEntry entry = new PolicyEntry(); - entry.setMemoryLimit(1024 * 100); - entry.setProducerFlowControl(true); - entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); - entry.setQueue(">"); - policies.setDefaultEntry(entry); - broker.setDestinationPolicy(policies); - broker.setUseJmx(false); - broker.start(); - broker.waitUntilStarted(); - - factory = new ActiveMQConnectionFactory(connectionUri); - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - public void testDupsOk() throws Exception { - - latch = new CountDownLatch(SEND_COUNT); - - for (int i = 0; i < CONSUMER_COUNT; i++) { - TestConsumer consumer = new TestConsumer(); - consumer.start(); - services.add(consumer); - } - for (int i = 0; i < PRODUCER_COUNT; i++) { - TestProducer producer = new TestProducer(); - producer.start(); - services.add(producer); - } - - waitForMessageReceipt(TimeUnit.SECONDS.toMillis(30)); - } - - @Override - public void onMessage(Message message) { - latch.countDown(); - if (++count % LOG_INTERVAL == 0) { - LOG.debug("Received message " + count); - } - - try { - Thread.sleep(1); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - /** - * @throws InterruptedException - * @throws TimeoutException - */ - private void waitForMessageReceipt(long timeout) throws InterruptedException, TimeoutException { - if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { - throw new TimeoutException(String.format("Consumner didn't receive expected # of messages, %d of %d received.", latch.getCount(), SEND_COUNT)); - } - } - - private interface Service { - - public void start() throws Exception; - - public void close(); - } - - private class TestProducer implements Runnable, Service { - - Thread thread; - BytesMessage message; - - Connection connection; - Session session; - MessageProducer producer; - - TestProducer() throws Exception { - thread = new Thread(this, "TestProducer"); - connection = factory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - producer = session.createProducer(session.createQueue("AMQ2401Test")); - } - - @Override - public void start() { - thread.start(); - } - - @Override - public void run() { - - int count = SEND_COUNT / PRODUCER_COUNT; - for (int i = 1; i <= count; i++) { - try { - if ((i % LOG_INTERVAL) == 0) { - LOG.debug("Sending: " + i); - } - message = session.createBytesMessage(); - message.writeBytes(new byte[1024]); - producer.send(message); - } - catch (JMSException jmse) { - jmse.printStackTrace(); - break; - } - } - } - - @Override - public void close() { - try { - connection.close(); - } - catch (JMSException e) { - } - } - } - - private class TestConsumer implements Runnable, Service { - - ActiveMQConnection connection; - Session session; - MessageConsumer consumer; - - TestConsumer() throws Exception { - factory.setOptimizeAcknowledge(false); - connection = (ActiveMQConnection) factory.createConnection(); - - session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - consumer = session.createConsumer(session.createQueue("AMQ2401Test")); - - consumer.setMessageListener(AMQ2401Test.this); - } - - @Override - public void start() throws Exception { - connection.start(); - } - - @Override - public void close() { - try { - connection.close(); - } - catch (JMSException e) { - } - } - - @Override - public void run() { - while (latch.getCount() > 0) { - try { - onMessage(consumer.receive()); - } - catch (Exception e) { - e.printStackTrace(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java deleted file mode 100644 index ed1af90..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java +++ /dev/null @@ -1,344 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Vector; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2413Test extends CombinationTestSupport implements MessageListener { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2413Test.class); - BrokerService broker; - private ActiveMQConnectionFactory factory; - - private static final int HANG_THRESHOLD = 60; - private static final int SEND_COUNT = 1000; - private static final int RECEIVER_THINK_TIME = 1; - private static final int CONSUMER_COUNT = 1; - private static final int PRODUCER_COUNT = 50; - private static final int TO_SEND = SEND_COUNT / PRODUCER_COUNT; - - public int deliveryMode = DeliveryMode.NON_PERSISTENT; - public int ackMode = Session.DUPS_OK_ACKNOWLEDGE; - public boolean useVMCursor = false; - public boolean useOptimizeAcks = false; - - private final ArrayList services = new ArrayList<>(CONSUMER_COUNT + PRODUCER_COUNT); - AtomicInteger count = new AtomicInteger(0); - Semaphore receivedMessages; - AtomicBoolean running = new AtomicBoolean(false); - - public void initCombos() { - addCombinationValues("deliveryMode", new Object[]{DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT}); - addCombinationValues("ackMode", new Object[]{Session.DUPS_OK_ACKNOWLEDGE, Session.AUTO_ACKNOWLEDGE}); - addCombinationValues("useVMCursor", new Object[]{true, false}); - // addCombinationValues("useOptimizeAcks", new Object[] {true, false}); - } - - @Override - protected void setUp() throws Exception { - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test"); - broker.setDeleteAllMessagesOnStartup(true); - - KahaDBPersistenceAdapter kahaDb = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - kahaDb.setConcurrentStoreAndDispatchQueues(false); - broker.addConnector("tcp://0.0.0.0:2401"); - PolicyMap policies = new PolicyMap(); - PolicyEntry entry = new PolicyEntry(); - entry.setMemoryLimit(1024 * 1024); - entry.setProducerFlowControl(true); - if (useVMCursor) { - entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); - } - entry.setQueue(">"); - policies.setDefaultEntry(entry); - broker.setDestinationPolicy(policies); - broker.start(); - broker.waitUntilStarted(); - - count.set(0); - receivedMessages = new Semaphore(0); - - factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2401"); - // factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false"); - setAutoFail(true); - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - running.set(false); - for (Service service : services) { - service.close(); - } - - broker.stop(); - broker.waitUntilStopped(); - - super.tearDown(); - } - - public void testReceipt() throws Exception { - - running.set(true); - - for (int i = 0; i < CONSUMER_COUNT; i++) { - TestConsumer consumer = new TestConsumer(); - consumer.start(); - services.add(consumer); - } - for (int i = 0; i < PRODUCER_COUNT; i++) { - TestProducer producer = new TestProducer(i); - producer.start(); - services.add(producer); - } - - waitForMessageReceipt(); - } - - /* - * (non-Javadoc) - * - * @see javax.jms.MessageListener#onMessage(javax.jms.Message) - */ - @Override - public void onMessage(Message message) { - receivedMessages.release(); - if (count.incrementAndGet() % 100 == 0) { - LOG.info("Received message " + count); - } - track(message); - if (RECEIVER_THINK_TIME > 0) { - try { - Thread.sleep(RECEIVER_THINK_TIME); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - } - - HashMap tracker = new HashMap<>(); - - private synchronized void track(Message message) { - try { - MessageId id = new MessageId(message.getJMSMessageID()); - ProducerId pid = id.getProducerId(); - int seq = (int) id.getProducerSequenceId(); - boolean[] ids = tracker.get(pid); - if (ids == null) { - ids = new boolean[TO_SEND + 1]; - ids[seq] = true; - tracker.put(pid, ids); - } - else { - assertTrue("not already received: " + id, !ids[seq]); - ids[seq] = true; - } - } - catch (Exception e) { - LOG.error(e.toString()); - } - } - - /** - * @throws InterruptedException - * @throws TimeoutException - */ - private void waitForMessageReceipt() throws InterruptedException, TimeoutException { - try { - while (count.get() < SEND_COUNT) { - if (!receivedMessages.tryAcquire(HANG_THRESHOLD, TimeUnit.SECONDS)) { - if (count.get() == SEND_COUNT) - break; - verifyTracking(); - throw new TimeoutException("@count=" + count.get() + " Message not received for more than " + HANG_THRESHOLD + " seconds"); - } - } - } - finally { - running.set(false); - } - } - - private void verifyTracking() { - Vector missing = new Vector<>(); - for (ProducerId pid : tracker.keySet()) { - boolean[] ids = tracker.get(pid); - for (int i = 1; i < TO_SEND + 1; i++) { - if (!ids[i]) { - missing.add(new MessageId(pid, i)); - } - } - } - assertTrue("No missing messages: " + missing, missing.isEmpty()); - } - - private interface Service { - - public void start() throws Exception; - - public void close(); - } - - private class TestProducer implements Runnable, Service { - - Thread thread; - BytesMessage message; - Connection connection; - Session session; - MessageProducer producer; - - TestProducer(int id) throws Exception { - thread = new Thread(this, "TestProducer-" + id); - connection = factory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - producer = session.createProducer(session.createQueue("AMQ2401Test")); - } - - @Override - public void start() { - thread.start(); - } - - @Override - public void run() { - - int i = 1; - for (; i <= TO_SEND; i++) { - try { - - if (+i % 100 == 0) { - LOG.info(Thread.currentThread().getName() + " Sending message " + i); - } - message = session.createBytesMessage(); - message.writeBytes(new byte[1024]); - producer.setDeliveryMode(deliveryMode); - producer.send(message); - } - catch (JMSException jmse) { - jmse.printStackTrace(); - break; - } - } - LOG.info(Thread.currentThread().getName() + " Sent: " + (i - 1)); - } - - @Override - public void close() { - try { - connection.close(); - } - catch (JMSException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - - private class TestConsumer implements Runnable, Service { - - ActiveMQConnection connection; - Session session; - MessageConsumer consumer; - - TestConsumer() throws Exception { - factory.setOptimizeAcknowledge(false); - connection = (ActiveMQConnection) factory.createConnection(); - if (useOptimizeAcks) { - connection.setOptimizeAcknowledge(true); - } - - session = connection.createSession(false, ackMode); - consumer = session.createConsumer(session.createQueue("AMQ2401Test")); - - consumer.setMessageListener(AMQ2413Test.this); - } - - @Override - public void start() throws Exception { - connection.start(); - } - - @Override - public void close() { - try { - connection.close(); - } - catch (JMSException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - /* - * (non-Javadoc) - * - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - while (running.get()) { - try { - onMessage(consumer.receive()); - } - catch (Exception e) { - e.printStackTrace(); - } - } - } - } - - public static Test suite() { - return suite(AMQ2413Test.class); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java deleted file mode 100644 index f4fb8a2..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.net.URI; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2439Test extends JmsMultipleBrokersTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2439Test.class); - Destination dest; - - public void testDuplicatesThroughNetwork() throws Exception { - assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500)); - assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500)); - validateQueueStats(); - } - - private void validateQueueStats() throws Exception { - final BrokerView brokerView = brokers.get("BrokerA").broker.getAdminView(); - assertEquals("enequeue is correct", 1000, brokerView.getTotalEnqueueCount()); - - assertTrue("dequeue is correct", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("dequeue count (want 1000), is : " + brokerView.getTotalDequeueCount()); - return 1000 == brokerView.getTotalDequeueCount(); - } - })); - } - - protected int receiveExactMessages(String brokerName, int msgCount) throws Exception { - - BrokerItem brokerItem = brokers.get(brokerName); - Connection connection = brokerItem.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(dest); - - Message msg; - int i; - for (i = 0; i < msgCount; i++) { - msg = consumer.receive(1000); - if (msg == null) { - break; - } - } - - connection.close(); - brokerItem.connections.remove(connection); - - return i; - } - - @Override - public void setUp() throws Exception { - super.setUp(); - createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=true&deleteAllMessagesOnStartup=true&advisorySupport=false")); - createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=true&deleteAllMessagesOnStartup=true&useJmx=false")); - bridgeBrokers("BrokerA", "BrokerB"); - - startAllBrokers(); - - // Create queue - dest = createDestination("TEST.FOO", false); - sendMessages("BrokerA", dest, 1000); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java deleted file mode 100644 index bcd2db1..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.TestSupport; -import org.apache.activemq.command.ActiveMQQueue; - -/** - * In CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE modes following exception - * occurs when ASYNCH consumers acknowledges messages in not in order they - * received the messages. - *

- * Exception thrown on broker side: - *

- * {@code javax.jms.JMSException: Could not correlate acknowledgment with - * dispatched message: MessageAck} - * - * @author daroo - */ -public class AMQ2489Test extends TestSupport { - - private final static String SEQ_NUM_PROPERTY = "seqNum"; - - private final static int TOTAL_MESSAGES_CNT = 2; - private final static int CONSUMERS_CNT = 2; - - private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); - - private Connection connection; - - @Override - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - @Override - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - public void testUnorderedClientAcknowledge() throws Exception { - doUnorderedAck(Session.CLIENT_ACKNOWLEDGE); - } - - public void testUnorderedIndividualAcknowledge() throws Exception { - doUnorderedAck(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); - } - - /** - * Main test method - * - * @param acknowledgmentMode - ACK mode to be used by consumers - * @throws Exception - */ - protected void doUnorderedAck(int acknowledgmentMode) throws Exception { - List consumers = null; - Session producerSession = null; - - connection.start(); - // Because exception is thrown on broker side only, let's set up - // exception listener to get it - final TestExceptionListener exceptionListener = new TestExceptionListener(); - connection.setExceptionListener(exceptionListener); - try { - consumers = new ArrayList<>(); - // start customers - for (int i = 0; i < CONSUMERS_CNT; i++) { - consumers.add(new Consumer(acknowledgmentMode)); - } - - // produce few test messages - producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageProducer producer = producerSession.createProducer(new ActiveMQQueue(getQueueName())); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) { - final Message message = producerSession.createTextMessage("test"); - // assign each message sequence number - message.setIntProperty(SEQ_NUM_PROPERTY, i); - producer.send(message); - } - - // during each onMessage() calls consumers decreases the LATCH - // counter. - // - // so, let's wait till all messages are consumed. - // - LATCH.await(); - - // wait a bit more to give exception listener a chance be populated - // with - // broker's error - TimeUnit.SECONDS.sleep(1); - - assertFalse(exceptionListener.getStatusText(), exceptionListener.hasExceptions()); - - } - finally { - if (producerSession != null) - producerSession.close(); - - if (consumers != null) { - for (Consumer c : consumers) { - c.close(); - } - } - } - } - - protected String getQueueName() { - return getClass().getName() + "." + getName(); - } - - public final class Consumer implements MessageListener { - - final Session session; - - private Consumer(int acknowledgmentMode) { - try { - session = connection.createSession(false, acknowledgmentMode); - final Queue queue = session.createQueue(getQueueName() + "?consumer.prefetchSize=1"); - final MessageConsumer consumer = session.createConsumer(queue); - consumer.setMessageListener(this); - } - catch (JMSException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - @Override - public void onMessage(Message message) { - try { - // retrieve sequence number assigned by producer... - final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY); - - // ...and let's delay every second message a little bit before - // acknowledgment - if ((seqNum % 2) == 0) { - System.out.println("Delayed message sequence numeber: " + seqNum); - try { - TimeUnit.SECONDS.sleep(1); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - message.acknowledge(); - } - catch (JMSException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - finally { - // decrease LATCH counter in the main test method. - LATCH.countDown(); - } - } - - private void close() { - if (session != null) { - try { - session.close(); - } - catch (JMSException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } - } - - public final class TestExceptionListener implements ExceptionListener { - - private final java.util.Queue exceptions = new ConcurrentLinkedQueue<>(); - - @Override - public void onException(JMSException e) { - exceptions.add(e); - } - - public boolean hasExceptions() { - return exceptions.isEmpty() == false; - } - - public String getStatusText() { - final StringBuilder str = new StringBuilder(); - str.append("Exceptions count on broker side: " + exceptions.size() + ".\nMessages:\n"); - for (Exception e : exceptions) { - str.append(e.getMessage() + "\n\n"); - } - return str.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java deleted file mode 100644 index b18a7b4..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.apache.activemq.util.IOHelper; - -public class AMQ2512Test extends EmbeddedBrokerTestSupport { - - private static Connection connection; - private final static String QUEUE_NAME = "dee.q"; - private final static int INITIAL_MESSAGES_CNT = 1000; - private final static int WORKER_INTERNAL_ITERATIONS = 100; - private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS + INITIAL_MESSAGES_CNT; - private final static byte[] payload = new byte[5 * 1024]; - private final static String TEXT = new String(payload); - - private final static String PRP_INITIAL_ID = "initial-id"; - private final static String PRP_WORKER_ID = "worker-id"; - - private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); - - private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger(); - - public void testKahaDBFailure() throws Exception { - final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress); - connection = fac.createConnection(); - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Queue queue = session.createQueue(QUEUE_NAME); - final MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - connection.start(); - - final long startTime = System.nanoTime(); - - final List consumers = new ArrayList<>(); - for (int i = 0; i < 20; i++) { - consumers.add(new Consumer("worker-" + i)); - } - - for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) { - final TextMessage msg = session.createTextMessage(TEXT); - msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i); - producer.send(msg); - } - - LATCH.await(); - final long endTime = System.nanoTime(); - System.out.println("Total execution time = " + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms]."); - System.out.println("Rate = " + TOTAL_MESSAGES_CNT / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s]."); - - for (Consumer c : consumers) { - c.close(); - } - connection.close(); - } - - private final static class Consumer implements MessageListener { - - private final String name; - private final Session session; - private final MessageProducer producer; - - private Consumer(String name) { - this.name = name; - try { - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10"); - producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - final MessageConsumer consumer = session.createConsumer(queue); - consumer.setMessageListener(this); - } - catch (JMSException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - @Override - public void onMessage(Message message) { - final TextMessage msg = (TextMessage) message; - try { - if (!msg.propertyExists(PRP_WORKER_ID)) { - for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) { - final TextMessage newMsg = session.createTextMessage(msg.getText()); - newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i); - newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID)); - producer.send(newMsg); - } - } - msg.acknowledge(); - - } - catch (JMSException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - finally { - final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement(); - if (onMsgCounter % 1000 == 0) { - System.out.println("message received: " + onMsgCounter); - } - LATCH.countDown(); - } - } - - private void close() { - if (session != null) { - try { - session.close(); - } - catch (JMSException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } - } - - @Override - protected void setUp() throws Exception { - bindAddress = "tcp://0.0.0.0:61617"; - super.setUp(); - } - - @Override - protected BrokerService createBroker() throws Exception { - File dataFileDir = new File("target/test-amq-2512/datadb"); - IOHelper.mkdirs(dataFileDir); - IOHelper.deleteChildren(dataFileDir); - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(dataFileDir); - BrokerService answer = new BrokerService(); - answer.setPersistenceAdapter(kaha); - - kaha.setEnableJournalDiskSyncs(false); - //kaha.setIndexCacheSize(10); - answer.setDataDirectoryFile(dataFileDir); - answer.setUseJmx(false); - answer.addConnector(bindAddress); - return answer; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java deleted file mode 100644 index eb25bdd..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.ObjectName; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DestinationViewMBean; -import org.apache.activemq.broker.jmx.ManagementContext; - -/** - * This unit test verifies an issue when - * javax.management.InstanceNotFoundException is thrown after subsequent startups when - * managementContext createConnector="false" - */ -public class AMQ2513Test extends TestCase { - - private BrokerService broker; - private String connectionUri; - - void createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = new BrokerService(); - broker.setBrokerName("localhost"); - broker.setUseJmx(true); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - broker.addConnector("tcp://localhost:0"); - - ManagementContext ctx = new ManagementContext(); - //if createConnector == true everything is fine - ctx.setCreateConnector(false); - broker.setManagementContext(ctx); - - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - public void testJmx() throws Exception { - createBroker(true); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue("test")); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - connection.start(); - - producer.send(session.createTextMessage("test123")); - - DestinationViewMBean dv = createView(); - assertTrue(dv.getQueueSize() > 0); - - connection.close(); - - broker.stop(); - broker.waitUntilStopped(); - - createBroker(false); - factory = new ActiveMQConnectionFactory(connectionUri); - connection = factory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(session.createQueue("test")); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - connection.start(); - producer.send(session.createTextMessage("test123")); - connection.close(); - - dv = createView(); - assertTrue(dv.getQueueSize() > 0); - - broker.stop(); - broker.waitUntilStopped(); - - } - - DestinationViewMBean createView() throws Exception { - String domain = "org.apache.activemq"; - ObjectName name = new ObjectName(domain + ":type=Broker,brokerName=localhost," + - "destinationType=Queue,destinationName=test"); - return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java deleted file mode 100644 index 148ab32..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.region.Queue; -import org.junit.Assert; - -/** - * This test demonstrates a bug in which calling - * Queue#removeMatchingMessages("") generates an exception, whereas the JMS - * specification states that an empty selector is valid. - */ -public class AMQ2528Test extends EmbeddedBrokerTestSupport { - - /** - * Setup the test so that the destination is a queue. - */ - @Override - protected void setUp() throws Exception { - useTopic = false; - super.setUp(); - } - - /** - * This test enqueues test messages to destination and then verifies that - * {@link Queue#removeMatchingMessages("")} removes all the messages. - */ - public void testRemoveMatchingMessages() throws Exception { - final int NUM_MESSAGES = 100; - final String MESSAGE_ID = "id"; - - // Enqueue the test messages. - Connection conn = createConnection(); - try { - conn.start(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - for (int id = 0; id < NUM_MESSAGES; id++) { - Message message = session.createMessage(); - message.setIntProperty(MESSAGE_ID, id); - producer.send(message); - } - producer.close(); - session.close(); - } - finally { - conn.close(); - } - - // Verify that half of the messages can be removed by selector. - Queue queue = (Queue) broker.getRegionBroker().getDestinations(destination).iterator().next(); - - Assert.assertEquals(NUM_MESSAGES / 2, queue.removeMatchingMessages(MESSAGE_ID + " < " + NUM_MESSAGES / 2)); - - // Verify that the remainder of the messages can be removed by empty - // selector. - Assert.assertEquals(NUM_MESSAGES - NUM_MESSAGES / 2, queue.removeMatchingMessages("")); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2386cd37/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java deleted file mode 100644 index 0c3ef45..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; - -public class AMQ2571Test extends EmbeddedBrokerTestSupport { - - public void testTempQueueClosing() { - try { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.bindAddress); - connectionFactory.setAlwaysSyncSend(true); - - // First create session that will own the TempQueue - Connection connectionA = connectionFactory.createConnection(); - connectionA.start(); - - Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE); - - TemporaryQueue tempQueue = sessionA.createTemporaryQueue(); - - // Next, create session that will put messages on the queue. - Connection connectionB = connectionFactory.createConnection(); - connectionB.start(); - - Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Create a producer for connection B. - final MessageProducer producerB = sessionB.createProducer(tempQueue); - producerB.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - final TextMessage message = sessionB.createTextMessage("Testing AMQ TempQueue."); - - Thread sendingThread = new Thread(new Runnable() { - @Override - public void run() { - try { - long end = System.currentTimeMillis() + 5 * 60 * 1000; - // wait for exception on send - while (System.currentTimeMillis() < end) { - producerB.send(message); - } - } - catch (JMSException e) { - e.printStackTrace(); - } - } - }); - - // Send 5000 messages. - sendingThread.start(); - // Now close connection A. This will remove the TempQueue. - connectionA.close(); - // Wait for the thread to finish. - sendingThread.join(5 * 60 * 1000); - - // Sleep for a while to make sure that we should know that the - // TempQueue is gone. - //Thread.sleep(50); - - // Now we test if we are able to send again. - try { - producerB.send(message); - fail("Involuntary recreated temporary queue."); - } - catch (JMSException e) { - // Got exception, just as we wanted because the creator of - // the TempQueue had closed the connection prior to the send. - assertTrue("TempQueue does not exist anymore.", true); - } - } - catch (Exception e) { - fail("Unexpected exception " + e); - } - } - - @Override - protected void setUp() throws Exception { - bindAddress = "vm://localhost"; - setAutoFail(true); - super.setUp(); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setPersistent(false); - answer.setUseJmx(false); - return answer; - } -} \ No newline at end of file