Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 356AB19BC2 for ; Fri, 4 Mar 2016 22:42:48 +0000 (UTC) Received: (qmail 28281 invoked by uid 500); 4 Mar 2016 22:42:47 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 28186 invoked by uid 500); 4 Mar 2016 22:42:47 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 26694 invoked by uid 99); 4 Mar 2016 22:42:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 22:42:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50663E7944; Fri, 4 Mar 2016 22:42:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 04 Mar 2016 22:43:10 -0000 Message-Id: <93ae86e1fa1944a89d15e9529262e018@git.apache.org> In-Reply-To: <64a7ec3b1ddf4b2f8d97fa76fd178cb9@git.apache.org> References: <64a7ec3b1ddf4b2f8d97fa76fd178cb9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/58] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java deleted file mode 100644 index d05a5c7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java +++ /dev/null @@ -1,433 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQMessageProducer; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Unit test for virtual topics and DLQ messaging. See individual test for more - * detail - */ -public class VirtualTopicDLQTest extends TestCase { - - private static BrokerService broker; - - private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDLQTest.class); - - static final String jmsConnectionURI = "failover:(vm://localhost)"; - - // Virtual Topic that the test publishes 10 messages to - private static final String virtualTopicName = "VirtualTopic.Test"; - - // Queues that receive all the messages send to the virtual topic - private static final String consumer1Prefix = "Consumer.A."; - private static final String consumer2Prefix = "Consumer.B."; - private static final String consumer3Prefix = "Consumer.C."; - - // Expected Individual Dead Letter Queue names that are tied to the - // Subscriber Queues - private static final String dlqPrefix = "ActiveMQ.DLQ.Topic."; - - // Number of messages - private static final int numberMessages = 6; - - @Override - @Before - public void setUp() throws Exception { - try { - broker = BrokerFactory.createBroker("xbean:org/apache/activemq/broker/virtual/virtual-individual-dlq.xml", true); - broker.start(); - broker.waitUntilStarted(); - } - catch (Exception e) { - e.printStackTrace(); - throw e; - } - } - - @Override - @After - public void tearDown() throws Exception { - try { - // Purge the DLQ's so counts are correct for next run - purgeDestination(dlqPrefix + consumer1Prefix + virtualTopicName); - purgeDestination(dlqPrefix + consumer2Prefix + virtualTopicName); - purgeDestination(dlqPrefix + consumer3Prefix + virtualTopicName); - } - catch (Exception e) { - e.printStackTrace(); - } - - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - } - - /* - * This test verifies that all undelivered messages sent to a consumers - * listening on a queue associated with a virtual topic with be forwarded to - * separate DLQ's. - * - * Note that the broker config, deadLetterStrategy need to have the enable - * audit set to false so that duplicate message sent from a topic to - * individual consumers are forwarded to the DLQ - * - * - * - * - * - */ - @Test - public void testVirtualTopicSubscriberDeadLetterQueue() throws Exception { - - TestConsumer consumer1 = null; - TestConsumer consumer2 = null; - TestConsumer consumer3 = null; - TestConsumer dlqConsumer1 = null; - TestConsumer dlqConsumer2 = null; - TestConsumer dlqConsumer3 = null; - - try { - - // The first 2 consumers will rollback, ultimately causing messages - // to land on the DLQ - consumer1 = new TestConsumer(consumer1Prefix + virtualTopicName, false, numberMessages, true); - thread(consumer1, false); - - consumer2 = new TestConsumer(consumer2Prefix + virtualTopicName, false, numberMessages, true); - thread(consumer2, false); - - // TestConsumer that does not throw exceptions, messages should not - // land on DLQ - consumer3 = new TestConsumer(consumer3Prefix + virtualTopicName, false, numberMessages, false); - thread(consumer3, false); - - // TestConsumer to read the expected Dead Letter Queue - dlqConsumer1 = new TestConsumer(dlqPrefix + consumer1Prefix + virtualTopicName, false, numberMessages, false); - thread(dlqConsumer1, false); - - dlqConsumer2 = new TestConsumer(dlqPrefix + consumer2Prefix + virtualTopicName, false, numberMessages, false); - thread(dlqConsumer2, false); - - dlqConsumer3 = new TestConsumer(dlqPrefix + consumer3Prefix + virtualTopicName, false, numberMessages, false); - thread(dlqConsumer3, false); - - // Give the consumers a second to start - Thread.sleep(1000); - - // Start the producer - TestProducer producer = new TestProducer(virtualTopicName, true, numberMessages); - thread(producer, false); - - assertTrue("sent all producer messages in time, count is: " + producer.getLatch().getCount(), producer.getLatch().await(10, TimeUnit.SECONDS)); - LOG.info("producer successful, count = " + producer.getLatch().getCount()); - - assertTrue("remaining consumer1 count should be zero, is: " + consumer1.getLatch().getCount(), consumer1.getLatch().await(10, TimeUnit.SECONDS)); - LOG.info("consumer1 successful, count = " + consumer1.getLatch().getCount()); - - assertTrue("remaining consumer2 count should be zero, is: " + consumer2.getLatch().getCount(), consumer2.getLatch().await(10, TimeUnit.SECONDS)); - LOG.info("consumer2 successful, count = " + consumer2.getLatch().getCount()); - - assertTrue("remaining consumer3 count should be zero, is: " + consumer3.getLatch().getCount(), consumer3.getLatch().await(10, TimeUnit.SECONDS)); - LOG.info("consumer3 successful, count = " + consumer3.getLatch().getCount()); - - assertTrue("remaining dlqConsumer1 count should be zero, is: " + dlqConsumer1.getLatch().getCount(), dlqConsumer1.getLatch().await(10, TimeUnit.SECONDS)); - LOG.info("dlqConsumer1 successful, count = " + dlqConsumer1.getLatch().getCount()); - - assertTrue("remaining dlqConsumer2 count should be zero, is: " + dlqConsumer2.getLatch().getCount(), dlqConsumer2.getLatch().await(10, TimeUnit.SECONDS)); - LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount()); - - assertTrue("remaining dlqConsumer3 count should be " + numberMessages + ", is: " + dlqConsumer3.getLatch().getCount(), dlqConsumer3.getLatch().getCount() == numberMessages); - LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount()); - - } - catch (Exception e) { - e.printStackTrace(); - throw e; - } - finally { - // Tell consumers to stop (don't read any more messages after this) - if (consumer1 != null) - consumer1.setStop(true); - if (consumer2 != null) - consumer2.setStop(true); - if (consumer3 != null) - consumer3.setStop(true); - if (dlqConsumer1 != null) - dlqConsumer1.setStop(true); - if (dlqConsumer2 != null) - dlqConsumer2.setStop(true); - if (dlqConsumer3 != null) - dlqConsumer3.setStop(true); - } - } - - private static Thread thread(Runnable runnable, boolean daemon) { - Thread brokerThread = new Thread(runnable); - brokerThread.setDaemon(daemon); - brokerThread.start(); - return brokerThread; - } - - private class TestProducer implements Runnable { - - private String destinationName = null; - private boolean isTopic = true; - private int numberMessages = 0; - private CountDownLatch latch = null; - - public TestProducer(String destinationName, boolean isTopic, int numberMessages) { - this.destinationName = destinationName; - this.isTopic = isTopic; - this.numberMessages = numberMessages; - latch = new CountDownLatch(numberMessages); - } - - public CountDownLatch getLatch() { - return latch; - } - - @Override - public void run() { - ActiveMQConnectionFactory connectionFactory = null; - ActiveMQConnection connection = null; - ActiveMQSession session = null; - Destination destination = null; - - try { - LOG.info("Started TestProducer for destination (" + destinationName + ")"); - - connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (isTopic) { - destination = session.createTopic(this.destinationName); - } - else { - destination = session.createQueue(this.destinationName); - } - - // Create a MessageProducer from the Session to the Topic or - // Queue - ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - for (int i = 0; i < numberMessages; i++) { - TextMessage message = session.createTextMessage("I am a message :: " + String.valueOf(i)); - try { - producer.send(message); - - } - catch (Exception deeperException) { - LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException); - } - - latch.countDown(); - Thread.sleep(1000); - } - - LOG.info("Finished TestProducer for destination (" + destinationName + ")"); - - } - catch (Exception e) { - LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e); - e.printStackTrace(); - - } - finally { - try { - // Clean up - if (session != null) - session.close(); - if (connection != null) - connection.close(); - - } - catch (Exception e) { - e.printStackTrace(); - LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); - } - } - } - } - - private class TestConsumer implements Runnable, ExceptionListener, MessageListener { - - private String destinationName = null; - private boolean isTopic = true; - private CountDownLatch latch = null; - private int maxRedeliveries = 0; - private int receivedMessageCounter = 0; - private boolean bFakeFail = false; - private boolean bStop = false; - - private ActiveMQConnectionFactory connectionFactory = null; - private ActiveMQConnection connection = null; - private Session session = null; - private MessageConsumer consumer = null; - - public TestConsumer(String destinationName, boolean isTopic, int expectedNumberMessages, boolean bFakeFail) { - this.destinationName = destinationName; - this.isTopic = isTopic; - latch = new CountDownLatch(expectedNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1)); - this.bFakeFail = bFakeFail; - } - - public CountDownLatch getLatch() { - return latch; - } - - @Override - public void run() { - - try { - LOG.info("Started TestConsumer for destination (" + destinationName + ")"); - - connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - session = connection.createSession(true, Session.SESSION_TRANSACTED); - - RedeliveryPolicy policy = connection.getRedeliveryPolicy(); - policy.setInitialRedeliveryDelay(1); - policy.setUseExponentialBackOff(false); - policy.setMaximumRedeliveries(maxRedeliveries); - - connection.setExceptionListener(this); - - Destination destination = null; - if (isTopic) { - destination = session.createTopic(destinationName); - } - else { - destination = session.createQueue(destinationName); - } - - consumer = session.createConsumer(destination); - consumer.setMessageListener(this); - - while (!bStop) { - Thread.sleep(100); - } - - LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString()); - - } - catch (Exception e) { - LOG.error("Consumer (" + destinationName + ") Caught: " + e); - e.printStackTrace(); - } - finally { - try { - // Clean up - if (consumer != null) - consumer.close(); - if (session != null) - session.close(); - if (connection != null) - connection.close(); - - } - catch (Exception e) { - e.printStackTrace(); - LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); - } - } - } - - @Override - public synchronized void onException(JMSException ex) { - ex.printStackTrace(); - LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occurred. Shutting down client."); - } - - public synchronized void setStop(boolean bStop) { - this.bStop = bStop; - } - - @Override - public synchronized void onMessage(Message message) { - receivedMessageCounter++; - latch.countDown(); - - LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + " :: Number messages received " + this.receivedMessageCounter); - - try { - LOG.info("Consumer for destination (" + destinationName + ") Received message id :: " + message.getJMSMessageID()); - - if (!bFakeFail) { - LOG.info("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString()); - session.commit(); - } - else { - LOG.info("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString()); - session.rollback(); // rolls back all the consumed messages - // on the session to - } - - } - catch (JMSException ex) { - ex.printStackTrace(); - LOG.error("Error reading JMS Message from destination " + destinationName + "."); - } - } - } - - private static void purgeDestination(String destination) throws Exception { - final Queue dest = (Queue) ((RegionBroker) broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue(destination)); - dest.purge(); - assertEquals(0, dest.getDestinationStatistics().getMessages().getCount()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java deleted file mode 100644 index 925b82c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.net.URI; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; -import org.apache.activemq.xbean.XBeanBrokerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test case for https://issues.apache.org/jira/browse/AMQ-3004 - */ - -public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class); - protected Connection connection; - - public void testVirtualTopicSelectorDisconnect() throws Exception { - testVirtualTopicDisconnect("odd = 'no'", 3000, 1500); - } - - public void testVirtualTopicNoSelectorDisconnect() throws Exception { - testVirtualTopicDisconnect(null, 3000, 3000); - } - - public void testVirtualTopicDisconnect(String messageSelector, int total, int expected) throws Exception { - if (connection == null) { - connection = createConnection(); - } - connection.start(); - - final ConsumerBean messageList = new ConsumerBean(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - Destination producerDestination = getProducerDestination(); - Destination destination = getConsumerDsetination(); - - LOG.info("Sending to: " + producerDestination); - LOG.info("Consuming from: " + destination); - - MessageConsumer consumer = createConsumer(session, destination, messageSelector); - - MessageListener listener = new MessageListener() { - @Override - public void onMessage(Message message) { - messageList.onMessage(message); - try { - message.acknowledge(); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - }; - - consumer.setMessageListener(listener); - - // create topic producer - MessageProducer producer = session.createProducer(producerDestination); - assertNotNull(producer); - - int disconnectCount = total / 3; - int reconnectCount = (total * 2) / 3; - - for (int i = 0; i < total; i++) { - producer.send(createMessage(session, i)); - - if (i == disconnectCount) { - consumer.close(); - } - if (i == reconnectCount) { - consumer = createConsumer(session, destination, messageSelector); - consumer.setMessageListener(listener); - } - } - - assertMessagesArrived(messageList, expected, 10000); - } - - protected Destination getConsumerDsetination() { - return new ActiveMQQueue("Consumer.VirtualTopic.TEST"); - } - - protected Destination getProducerDestination() { - return new ActiveMQTopic("VirtualTopic.TEST"); - } - - @Override - protected void setUp() throws Exception { - super.setUp(); - } - - protected MessageConsumer createConsumer(Session session, - Destination destination, - String messageSelector) throws JMSException { - if (messageSelector != null) { - return session.createConsumer(destination, messageSelector); - } - else { - return session.createConsumer(destination); - } - } - - protected TextMessage createMessage(Session session, int i) throws JMSException { - TextMessage textMessage = session.createTextMessage("message: " + i); - if (i % 2 != 0) { - textMessage.setStringProperty("odd", "yes"); - } - else { - textMessage.setStringProperty("odd", "no"); - } - textMessage.setIntProperty("i", i); - return textMessage; - } - - protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) { - messageList.assertMessagesArrived(expected, timeout); - - messageList.flushMessages(); - - LOG.info("validate no other messages on queues"); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination destination1 = getConsumerDsetination(); - - MessageConsumer c1 = session.createConsumer(destination1, null); - c1.setMessageListener(messageList); - - LOG.info("send one simple message that should go to both consumers"); - MessageProducer producer = session.createProducer(getProducerDestination()); - assertNotNull(producer); - - producer.send(session.createTextMessage("Last Message")); - - messageList.assertMessagesArrived(1); - - } - catch (JMSException e) { - e.printStackTrace(); - fail("unexpeced ex while waiting for last messages: " + e); - } - } - - protected String getBrokerConfigUri() { - return "org/apache/activemq/broker/virtual/disconnected-selector.xml"; - } - - @Override - protected BrokerService createBroker() throws Exception { - XBeanBrokerFactory factory = new XBeanBrokerFactory(); - BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); - return answer; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java deleted file mode 100644 index 0f2af0a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.util.Vector; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; - -/** - * - * - */ -public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport { - - private Vector connections = new Vector<>(); - public int ackMode = Session.AUTO_ACKNOWLEDGE; - - public static Test suite() { - return suite(VirtualTopicPubSubTest.class); - } - - public void initCombosForTestVirtualTopicCreation() { - addCombinationValues("ackMode", new Object[]{new Integer(Session.AUTO_ACKNOWLEDGE), new Integer(Session.CLIENT_ACKNOWLEDGE)}); - } - - private boolean doneTwice = false; - - public void testVirtualTopicCreation() throws Exception { - doTestVirtualTopicCreation(10); - } - - public void doTestVirtualTopicCreation(int total) throws Exception { - - ConsumerBean messageList = new ConsumerBean() { - @Override - public synchronized void onMessage(Message message) { - super.onMessage(message); - if (ackMode == Session.CLIENT_ACKNOWLEDGE) { - try { - message.acknowledge(); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - } - }; - messageList.setVerbose(true); - - String queueAName = getVirtualTopicConsumerName(); - // create consumer 'cluster' - ActiveMQQueue queue1 = new ActiveMQQueue(queueAName); - ActiveMQQueue queue2 = new ActiveMQQueue(queueAName); - - Session session = createStartAndTrackConnection().createSession(false, ackMode); - MessageConsumer c1 = session.createConsumer(queue1); - - session = createStartAndTrackConnection().createSession(false, ackMode); - MessageConsumer c2 = session.createConsumer(queue2); - - c1.setMessageListener(messageList); - c2.setMessageListener(messageList); - - // create topic producer - Session producerSession = createStartAndTrackConnection().createSession(false, ackMode); - MessageProducer producer = producerSession.createProducer(new ActiveMQTopic(getVirtualTopicName())); - assertNotNull(producer); - - for (int i = 0; i < total; i++) { - producer.send(producerSession.createTextMessage("message: " + i)); - } - - messageList.assertMessagesArrived(total); - - // do twice so we confirm messages do not get redelivered after client acknowledgement - if (doneTwice == false) { - doneTwice = true; - doTestVirtualTopicCreation(0); - } - } - - private Connection createStartAndTrackConnection() throws Exception { - Connection connection = createConnection(); - connection.start(); - connections.add(connection); - return connection; - } - - protected String getVirtualTopicName() { - return "VirtualTopic.TEST"; - } - - protected String getVirtualTopicConsumerName() { - return "Consumer.A.VirtualTopic.TEST"; - } - - @Override - protected void tearDown() throws Exception { - for (Connection connection : connections) { - connection.close(); - } - super.tearDown(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java deleted file mode 100644 index 1d7ea71..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.net.URI; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.xbean.XBeanBrokerFactory; - -/** - * - * - */ -public class VirtualTopicPubSubUsingXBeanTest extends VirtualTopicPubSubTest { - - @Override - protected String getVirtualTopicConsumerName() { - return "VirtualTopicConsumers.ConsumerNumberOne.FOO"; - } - - @Override - protected String getVirtualTopicName() { - return "FOO"; - } - - @Override - protected BrokerService createBroker() throws Exception { - XBeanBrokerFactory factory = new XBeanBrokerFactory(); - BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); - - // lets disable persistence as we are a test - answer.setPersistent(false); - - return answer; - } - - protected String getBrokerConfigUri() { - return "org/apache/activemq/broker/virtual/global-virtual-topics.xml"; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java deleted file mode 100644 index d94dd18..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.virtual.VirtualDestination; -import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; -import org.apache.activemq.broker.region.virtual.VirtualTopic; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VirtualTopicSelectorTest extends CompositeTopicTest { - - private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorTest.class); - - @Override - protected Destination getConsumer1Dsetination() { - return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST"); - } - - @Override - protected Destination getConsumer2Dsetination() { - return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST"); - } - - @Override - protected Destination getProducerDestination() { - return new ActiveMQTopic("VirtualTopic.TEST"); - } - - @Override - protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { - messageList1.assertMessagesArrived(total / 2); - messageList2.assertMessagesArrived(total / 2); - - messageList1.flushMessages(); - messageList2.flushMessages(); - - LOG.info("validate no other messages on queues"); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination destination1 = getConsumer1Dsetination(); - Destination destination2 = getConsumer2Dsetination(); - MessageConsumer c1 = session.createConsumer(destination1, null); - MessageConsumer c2 = session.createConsumer(destination2, null); - c1.setMessageListener(messageList1); - c2.setMessageListener(messageList2); - - LOG.info("send one simple message that should go to both consumers"); - MessageProducer producer = session.createProducer(getProducerDestination()); - assertNotNull(producer); - - producer.send(session.createTextMessage("Last Message")); - - messageList1.assertMessagesArrived(1); - messageList2.assertMessagesArrived(1); - - } - catch (JMSException e) { - e.printStackTrace(); - fail("unexpeced ex while waiting for last messages: " + e); - } - } - - @Override - protected BrokerService createBroker() throws Exception { - // use message selectors on consumers that need to propagate up to the virtual - // topic dispatch so that un matched messages do not linger on subscription queues - messageSelector1 = "odd = 'yes'"; - messageSelector2 = "odd = 'no'"; - - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - - VirtualTopic virtualTopic = new VirtualTopic(); - // the new config that enables selectors on the intercepter - virtualTopic.setSelectorAware(true); - VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); - interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic}); - broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); - return broker; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java deleted file mode 100644 index 4abf811..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.broker.jmx.MBeanTest; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; - -public class VirtualTopicsAndDurableSubsTest extends MBeanTest { - - private Connection connection; - - public void testVirtualTopicCreationAndDurableSubs() throws Exception { - if (connection == null) { - connection = createConnection(); - } - connection.setClientID(getAClientID()); - connection.start(); - - ConsumerBean messageList = new ConsumerBean(); - messageList.setVerbose(true); - - String queueAName = getVirtualTopicConsumerName(); - // create consumer 'cluster' - ActiveMQQueue queue1 = new ActiveMQQueue(queueAName); - ActiveMQQueue queue2 = new ActiveMQQueue(queueAName); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer c1 = session.createConsumer(queue1); - MessageConsumer c2 = session.createConsumer(queue2); - - c1.setMessageListener(messageList); - c2.setMessageListener(messageList); - - // create topic producer - MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName())); - assertNotNull(producer); - - int total = 10; - for (int i = 0; i < total; i++) { - producer.send(session.createTextMessage("message: " + i)); - } - messageList.assertMessagesArrived(total); - - //Add and remove durable subscriber after using VirtualTopics - assertCreateAndDestroyDurableSubscriptions(); - } - - protected String getAClientID() { - return "VirtualTopicCreationAndDurableSubs"; - } - - protected String getVirtualTopicName() { - return "VirtualTopic.TEST"; - } - - protected String getVirtualTopicConsumerName() { - return "Consumer.A.VirtualTopic.TEST"; - } - - protected String getDurableSubscriberName() { - return "Sub1"; - } - - protected String getDurableSubscriberTopicName() { - return "simple.topic"; - } - - @Override - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - } - super.tearDown(); - } - - //Overrides test cases from MBeanTest to avoid having them run. - @Override - public void testMBeans() throws Exception { - } - - @Override - public void testMoveMessages() throws Exception { - } - - @Override - public void testRetryMessages() throws Exception { - } - - @Override - public void testMoveMessagesBySelector() throws Exception { - } - - @Override - public void testCopyMessagesBySelector() throws Exception { - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml deleted file mode 100644 index ed3bc73..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml +++ /dev/null @@ -1,47 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml deleted file mode 100644 index ded6471..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml +++ /dev/null @@ -1,47 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml deleted file mode 100644 index 2772910..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml deleted file mode 100644 index d51f03c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml +++ /dev/null @@ -1,47 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml deleted file mode 100644 index ddd0667..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml deleted file mode 100644 index d725436..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml +++ /dev/null @@ -1,80 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml deleted file mode 100644 index fcce72e..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml +++ /dev/null @@ -1,50 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java deleted file mode 100644 index 0568757..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; - -/** - * An AMQ-1282 Test - */ -public class AMQ1282 extends TestCase { - - private ConnectionFactory factory; - private Connection connection; - private MapMessage message; - - @Override - protected void setUp() throws Exception { - factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - message = session.createMapMessage(); - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - connection.close(); - super.tearDown(); - } - - public void testUnmappedBooleanMessage() throws JMSException { - Object expected; - try { - expected = Boolean.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - Boolean actual = message.getBoolean("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - assertEquals(expected, ex); - } - } - - public void testUnmappedIntegerMessage() throws JMSException { - Object expected; - try { - expected = Integer.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - Integer actual = message.getInt("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - Class aClass = expected.getClass(); - assertTrue(aClass.isInstance(ex)); - } - } - - public void testUnmappedShortMessage() throws JMSException { - Object expected; - try { - expected = Short.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - Short actual = message.getShort("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - Class aClass = expected.getClass(); - assertTrue(aClass.isInstance(ex)); - } - } - - public void testUnmappedLongMessage() throws JMSException { - Object expected; - try { - expected = Long.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - Long actual = message.getLong("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - Class aClass = expected.getClass(); - assertTrue(aClass.isInstance(ex)); - } - } - - public void testUnmappedStringMessage() throws JMSException { - Object expected; - try { - expected = String.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - String actual = message.getString("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - Class aClass = expected.getClass(); - assertTrue(aClass.isInstance(ex)); - } - } - - public void testUnmappedCharMessage() throws JMSException { - try { - message.getChar("foo"); - fail("should have thrown NullPointerException"); - } - catch (NullPointerException success) { - assertNotNull(success); - } - } - - public void testUnmappedByteMessage() throws JMSException { - Object expected; - try { - expected = Byte.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - Byte actual = message.getByte("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - Class aClass = expected.getClass(); - assertTrue(aClass.isInstance(ex)); - } - } - - public void testUnmappedDoubleMessage() throws JMSException { - Object expected; - try { - expected = Double.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - Double actual = message.getDouble("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - Class aClass = expected.getClass(); - assertTrue(aClass.isInstance(ex)); - } - } - - public void testUnmappedFloatMessage() throws JMSException { - Object expected; - try { - expected = Float.valueOf(null); - } - catch (Exception ex) { - expected = ex; - } - try { - Float actual = message.getFloat("foo"); - assertEquals(expected, actual); - } - catch (Exception ex) { - Class aClass = expected.getClass(); - assertTrue(aClass.isInstance(ex)); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java deleted file mode 100644 index 78a6088..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; - -/** - * - * - */ -public class AMQ1687Test extends EmbeddedBrokerTestSupport { - - private Connection connection; - - @Override - protected ConnectionFactory createConnectionFactory() throws Exception { - //prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log. - return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.prefetchPolicy.all=5"); - } - - public void testVirtualTopicCreation() throws Exception { - if (connection == null) { - connection = createConnection(); - } - connection.start(); - - ConsumerBean messageList = new ConsumerBean(); - messageList.setVerbose(true); - - String queueAName = getVirtualTopicConsumerName(); - String queueBName = getVirtualTopicConsumerNameB(); - - // create consumer 'cluster' - ActiveMQQueue queue1 = new ActiveMQQueue(queueAName); - ActiveMQQueue queue2 = new ActiveMQQueue(queueBName); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer c1 = session.createConsumer(queue1); - MessageConsumer c2 = session.createConsumer(queue2); - - c1.setMessageListener(messageList); - c2.setMessageListener(messageList); - - // create topic producer - ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName()); - MessageProducer producer = session.createProducer(topic); - assertNotNull(producer); - - int total = 100; - for (int i = 0; i < total; i++) { - producer.send(session.createTextMessage("message: " + i)); - } - - messageList.assertMessagesArrived(total * 2); - } - - protected String getVirtualTopicName() { - return "VirtualTopic.TEST"; - } - - protected String getVirtualTopicConsumerName() { - return "Consumer.A.VirtualTopic.TEST"; - } - - protected String getVirtualTopicConsumerNameB() { - return "Consumer.B.VirtualTopic.TEST"; - } - - @Override - protected void setUp() throws Exception { - this.bindAddress = "tcp://localhost:0"; - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - } - super.tearDown(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java deleted file mode 100644 index 2f7b8fe..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import java.net.URI; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQMessageProducer; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test validates that the AMQ consumer blocks on redelivery of a message, - * through all redeliveries, until the message is either successfully consumed - * or sent to the DLQ. - */ -public class AMQ1853Test { - - private static BrokerService broker; - - private static final Logger LOG = LoggerFactory.getLogger(AMQ1853Test.class); - static final String jmsConnectionURI = "failover:(vm://localhost)"; - - // Virtual Topic that the test publishes 10 messages to - private static final String queueFail = "Queue.BlockingConsumer.QueueFail"; - - // Number of messages - - private final int producerMessages = 5; - private final int totalNumberMessages = producerMessages * 2; - private final int maxRedeliveries = 2; - private final int redeliveryDelay = 1000; - - private Map messageList = null; - - @Before - public void setUp() throws Exception { - broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false")); - broker.setUseJmx(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - } - - @Test - public void testConsumerMessagesAreNotOrdered() throws Exception { - - TestConsumer consumerAllFail = null; - messageList = new Hashtable<>(); - - try { - - // The first 2 consumers will rollback, ultimately causing messages to land on the DLQ - - TestProducer producerAllFail = new TestProducer(queueFail); - thread(producerAllFail, false); - - consumerAllFail = new TestConsumer(queueFail, true); - thread(consumerAllFail, false); - - // Give the consumers a second to start - Thread.sleep(1000); - - thread(producerAllFail, false); - - // Give the consumers a second to start - Thread.sleep(1000); - - producerAllFail.getLatch().await(); - - LOG.info("producer successful, count = " + producerAllFail.getLatch().getCount()); - LOG.info("final message list size = " + messageList.size()); - - assertTrue("message list size = " + messageList.size() + " exptected:" + totalNumberMessages, Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return totalNumberMessages == messageList.size(); - } - })); - - consumerAllFail.getLatch().await(); - - LOG.info("consumerAllFail successful, count = " + consumerAllFail.getLatch().getCount()); - - Iterator keys = messageList.keySet().iterator(); - for (AtomicInteger counter : messageList.values()) { - String message = keys.next(); - LOG.info("final count for message " + message + " counter = " + counter.get()); - assertTrue("for message " + message + " counter = " + counter.get(), counter.get() == maxRedeliveries + 1); - } - - assertFalse(consumerAllFail.messageReceiptIsOrdered()); - } - finally { - if (consumerAllFail != null) { - consumerAllFail.setStop(true); - } - } - } - - private static Thread thread(Runnable runnable, boolean daemon) { - Thread brokerThread = new Thread(runnable); - brokerThread.setDaemon(daemon); - brokerThread.start(); - return brokerThread; - } - - private class TestProducer implements Runnable { - - private CountDownLatch latch = null; - private String destinationName = null; - - public TestProducer(String destinationName) { - this.destinationName = destinationName; - // We run the producer 2 times - latch = new CountDownLatch(totalNumberMessages); - } - - public CountDownLatch getLatch() { - return latch; - } - - @Override - public void run() { - - ActiveMQConnectionFactory connectionFactory = null; - ActiveMQConnection connection = null; - ActiveMQSession session = null; - Destination destination = null; - - try { - LOG.info("Started TestProducer for destination (" + destinationName + ")"); - - connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setCopyMessageOnSend(false); - connection.start(); - session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - destination = session.createQueue(this.destinationName); - - // Create a MessageProducer from the Session to the Topic or Queue - ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - for (int i = 0; i < (producerMessages); i++) { - TextMessage message = session.createTextMessage(); - message.setLongProperty("TestTime", (System.currentTimeMillis())); - try { - producer.send(message); - LOG.info("Producer (" + destinationName + ")\n" + message.getJMSMessageID() + " = sent messageId\n"); - - latch.countDown(); - LOG.info(" Latch count " + latch.getCount()); - LOG.info("Producer message list size = " + messageList.keySet().size()); - messageList.put(message.getJMSMessageID(), new AtomicInteger(0)); - LOG.info("Producer message list size = " + messageList.keySet().size()); - - } - catch (Exception deeperException) { - LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException); - } - - Thread.sleep(1000); - } - - LOG.info("Finished TestProducer for destination (" + destinationName + ")"); - - } - catch (Exception e) { - LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e); - } - finally { - try { - if (session != null) { - session.close(); - } - if (connection != null) { - connection.close(); - } - } - catch (Exception e) { - LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); - } - } - } - } - - private class TestConsumer implements Runnable, ExceptionListener, MessageListener { - - private CountDownLatch latch = null; - private int receivedMessageCounter = 0; - private boolean bFakeFail = false; - String destinationName = null; - boolean bMessageReceiptIsOrdered = true; - boolean bStop = false; - String previousMessageId = null; - - private ActiveMQConnectionFactory connectionFactory = null; - private ActiveMQConnection connection = null; - private Session session = null; - private MessageConsumer consumer = null; - - public TestConsumer(String destinationName, boolean bFakeFail) { - this.bFakeFail = bFakeFail; - latch = new CountDownLatch(totalNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1)); - this.destinationName = destinationName; - } - - public CountDownLatch getLatch() { - return latch; - } - - public boolean messageReceiptIsOrdered() { - return bMessageReceiptIsOrdered; - } - - @Override - public void run() { - - try { - LOG.info("Started TestConsumer for destination (" + destinationName + ")"); - - connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setNonBlockingRedelivery(true); - session = connection.createSession(true, Session.SESSION_TRANSACTED); - - RedeliveryPolicy policy = connection.getRedeliveryPolicy(); - policy.setInitialRedeliveryDelay(redeliveryDelay); - policy.setBackOffMultiplier(-1); - policy.setRedeliveryDelay(redeliveryDelay); - policy.setMaximumRedeliveryDelay(-1); - policy.setUseExponentialBackOff(false); - policy.setMaximumRedeliveries(maxRedeliveries); - - connection.setExceptionListener(this); - Destination destination = session.createQueue(destinationName); - consumer = session.createConsumer(destination); - consumer.setMessageListener(this); - - connection.start(); - - while (!bStop) { - Thread.sleep(100); - } - - LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString()); - - } - catch (Exception e) { - LOG.error("Consumer (" + destinationName + ") Caught: " + e); - } - finally { - try { - if (consumer != null) { - consumer.close(); - } - if (session != null) { - session.close(); - } - if (connection != null) { - connection.close(); - } - } - catch (Exception e) { - LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); - } - } - } - - @Override - public synchronized void onException(JMSException ex) { - LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occurred. Shutting down client."); - } - - public synchronized void setStop(boolean bStop) { - this.bStop = bStop; - } - - @Override - public synchronized void onMessage(Message message) { - receivedMessageCounter++; - latch.countDown(); - - LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + - " :: Number messages received " + this.receivedMessageCounter); - - try { - - if (receivedMessageCounter % (maxRedeliveries + 1) == 1) { - previousMessageId = message.getJMSMessageID(); - } - - if (bMessageReceiptIsOrdered) { - bMessageReceiptIsOrdered = previousMessageId.trim().equals(message.getJMSMessageID()); - } - - final String jmsMessageId = message.getJMSMessageID(); - assertTrue("Did not find expected ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return messageList.containsKey(jmsMessageId); - } - })); - - AtomicInteger counter = messageList.get(jmsMessageId); - counter.incrementAndGet(); - - LOG.info("Consumer for destination (" + destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n" + previousMessageId + " = previousMessageId\n" + bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n" + ">>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\n" + "message counter = " + counter.get()); - - if (!bFakeFail) { - LOG.debug("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString()); - session.commit(); - } - else { - LOG.debug("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString()); - session.rollback(); // rolls back all the consumed messages on the session to - } - - } - catch (Exception ex) { - ex.printStackTrace(); - LOG.error("Error reading JMS Message from destination " + destinationName + "."); - } - } - } -}