Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CCED5192C6 for ; Thu, 31 Mar 2016 02:30:49 +0000 (UTC) Received: (qmail 95738 invoked by uid 500); 31 Mar 2016 02:30:49 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 95521 invoked by uid 500); 31 Mar 2016 02:30:49 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 93482 invoked by uid 99); 31 Mar 2016 02:30:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Mar 2016 02:30:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B54EE78A1; Thu, 31 Mar 2016 02:30:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 31 Mar 2016 02:31:01 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/69] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java deleted file mode 100644 index 882105b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java +++ /dev/null @@ -1,520 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.Date; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class AMQ4083Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class); - private static BrokerService brokerService; - private static String BROKER_ADDRESS = "tcp://localhost:0"; - private static String TEST_QUEUE = "testQueue"; - private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); - - private final int messageCount = 100; - - private String connectionUri; - private String[] data; - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - brokerService.setDeleteAllMessagesOnStartup(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - brokerService.start(); - brokerService.waitUntilStarted(); - - data = new String[messageCount]; - - for (int i = 0; i < messageCount; i++) { - data[i] = "Text for message: " + i + " at " + new Date(); - } - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testExpiredMsgsBeforeNonExpired() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.getPrefetchPolicy().setQueuePrefetch(400); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - connection.start(); - - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - - // send a batch that expires in a short time. - for (int i = 0; i < 100; i++) { - producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); - } - - // and send one that doesn't expire to we can ack it. - producer.send(session.createTextMessage()); - - // wait long enough so the first batch times out. - TimeUnit.SECONDS.sleep(5); - - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - - assertEquals(101, queueView.getInFlightCount()); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - message.acknowledge(); - } - catch (JMSException e) { - } - } - }); - - TimeUnit.SECONDS.sleep(5); - - assertEquals(0, queueView.getInFlightCount()); - - for (int i = 0; i < 200; i++) { - producer.send(session.createTextMessage()); - } - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); - LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); - LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); - LOG.info("Expired Count: {}", queueView.getExpiredCount()); - LOG.info("InFlight Count: {}", queueView.getInFlightCount()); - } - - @Test - public void testExpiredMsgsBeforeNonExpiredWithTX() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.getPrefetchPolicy().setQueuePrefetch(400); - - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - connection.start(); - - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - - // send a batch that expires in a short time. - for (int i = 0; i < 100; i++) { - producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); - } - - // and send one that doesn't expire to we can ack it. - producer.send(session.createTextMessage()); - session.commit(); - - // wait long enough so the first batch times out. - TimeUnit.SECONDS.sleep(5); - - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - - assertEquals(101, queueView.getInFlightCount()); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - session.commit(); - } - catch (JMSException e) { - } - } - }); - - TimeUnit.SECONDS.sleep(5); - - assertEquals(0, queueView.getInFlightCount()); - - for (int i = 0; i < 200; i++) { - producer.send(session.createTextMessage()); - } - session.commit(); - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); - LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); - LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); - LOG.info("Expired Count: {}", queueView.getExpiredCount()); - LOG.info("InFlight Count: {}", queueView.getInFlightCount()); - } - - @Test - public void testExpiredMsgsInterleavedWithNonExpired() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.getPrefetchPolicy().setQueuePrefetch(400); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - connection.start(); - - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - - // send a batch that expires in a short time. - for (int i = 0; i < 200; i++) { - - if ((i % 2) == 0) { - producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); - } - else { - producer.send(session.createTextMessage()); - } - } - - // wait long enough so the first batch times out. - TimeUnit.SECONDS.sleep(5); - - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - - assertEquals(200, queueView.getInFlightCount()); - - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - try { - LOG.debug("Acking message: {}", message); - message.acknowledge(); - } - catch (JMSException e) { - } - } - }); - - TimeUnit.SECONDS.sleep(5); - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - for (int i = 0; i < 200; i++) { - producer.send(session.createTextMessage()); - } - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); - LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); - LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); - LOG.info("Expired Count: {}", queueView.getExpiredCount()); - LOG.info("InFlight Count: {}", queueView.getInFlightCount()); - } - - @Test - public void testExpiredMsgsInterleavedWithNonExpiredCumulativeAck() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.getPrefetchPolicy().setQueuePrefetch(400); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - connection.start(); - - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - - // send a batch that expires in a short time. - for (int i = 0; i < 200; i++) { - - if ((i % 2) == 0) { - producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); - } - else { - producer.send(session.createTextMessage()); - } - } - - // wait long enough so the first batch times out. - TimeUnit.SECONDS.sleep(5); - - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - - assertEquals(200, queueView.getInFlightCount()); - - final AtomicInteger msgCount = new AtomicInteger(); - - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - try { - if (msgCount.incrementAndGet() == 100) { - LOG.debug("Acking message: {}", message); - message.acknowledge(); - } - } - catch (JMSException e) { - } - } - }); - - TimeUnit.SECONDS.sleep(5); - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - // Now we just ack each and see if our counters come out right in the end. - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - try { - LOG.debug("Acking message: {}", message); - message.acknowledge(); - } - catch (JMSException e) { - } - } - }); - - for (int i = 0; i < 200; i++) { - producer.send(session.createTextMessage()); - } - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); - LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); - LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); - LOG.info("Expired Count: {}", queueView.getExpiredCount()); - LOG.info("InFlight Count: {}", queueView.getInFlightCount()); - } - - @Test - public void testExpiredBatchBetweenNonExpiredMessages() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.getPrefetchPolicy().setQueuePrefetch(400); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - connection.start(); - - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - - // Send one that doesn't expire so we can ack it. - producer.send(session.createTextMessage()); - - // send a batch that expires in a short time. - for (int i = 0; i < 100; i++) { - producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); - } - - // and send one that doesn't expire so we can ack it. - producer.send(session.createTextMessage()); - - // wait long enough so the first batch times out. - TimeUnit.SECONDS.sleep(5); - - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - - assertEquals(102, queueView.getInFlightCount()); - - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - try { - message.acknowledge(); - } - catch (JMSException e) { - } - } - }); - - TimeUnit.SECONDS.sleep(5); - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - for (int i = 0; i < 200; i++) { - producer.send(session.createTextMessage()); - } - - assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return queueView.getInFlightCount() == 0; - } - })); - - LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); - LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); - LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); - LOG.info("Expired Count: {}", queueView.getExpiredCount()); - LOG.info("InFlight Count: {}", queueView.getInFlightCount()); - } - - @Test - public void testConsumeExpiredQueueAndDlq() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - Connection connection = factory.createConnection(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producerNormal = session.createProducer(queue); - MessageProducer producerExpire = session.createProducer(queue); - producerExpire.setTimeToLive(500); - - MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("ActiveMQ.DLQ")); - connection.start(); - - Connection consumerConnection = factory.createConnection(); - ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); - prefetchPolicy.setAll(10); - ((ActiveMQConnection) consumerConnection).setPrefetchPolicy(prefetchPolicy); - Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); - - String msgBody = new String(new byte[20 * 1024]); - for (int i = 0; i < data.length; i++) { - Message message = session.createTextMessage(msgBody); - producerExpire.send(queue, message); - } - - for (int i = 0; i < data.length; i++) { - Message message = session.createTextMessage(msgBody); - producerNormal.send(queue, message); - } - - ArrayList messages = new ArrayList<>(); - Message received; - while ((received = consumer.receive(1000)) != null) { - messages.add(received); - if (messages.size() == 1) { - TimeUnit.SECONDS.sleep(1); - } - received.acknowledge(); - } - - assertEquals("got messages", messageCount + 1, messages.size()); - - ArrayList dlqMessages = new ArrayList<>(); - while ((received = dlqConsumer.receive(1000)) != null) { - dlqMessages.add(received); - } - - assertEquals("got dlq messages", data.length - 1, dlqMessages.size()); - - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - - LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); - LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); - LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); - LOG.info("Expired Count: {}", queueView.getExpiredCount()); - LOG.info("InFlight Count: {}", queueView.getInFlightCount()); - } - - private QueueViewMBean getProxyToQueueViewMBean() throws Exception { - final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); - final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java deleted file mode 100644 index e894b70..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.HashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4092Test extends TestCase { - - private static final Logger log = LoggerFactory.getLogger(AMQ4092Test.class); - - static final String QUEUE_NAME = "TEST"; - - // increase limits to expedite failure - static final int NUM_TO_SEND_PER_PRODUCER = 1000; // 10000 - static final int NUM_PRODUCERS = 5; // 40 - - static final ActiveMQQueue[] DESTINATIONS = new ActiveMQQueue[]{new ActiveMQQueue("A"), new ActiveMQQueue("B") - // A/B seems to be sufficient for concurrentStoreAndDispatch=true - }; - - static final boolean debug = false; - - private BrokerService brokerService; - - private ActiveMQQueue destination; - private HashMap exceptions = new HashMap<>(); - private ExceptionListener exceptionListener = new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - exceptions.put(Thread.currentThread(), exception); - } - }; - - @Override - protected void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(true); - ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false); - brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - destination = new ActiveMQQueue(); - destination.setCompositeDestinations(DESTINATIONS); - Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - exceptions.put(t, e); - } - }); - } - - @Override - protected void tearDown() throws Exception { - // Stop any running threads. - brokerService.stop(); - } - - public void testConcurrentGroups() throws Exception { - ExecutorService executorService = Executors.newCachedThreadPool(); - executorService.submit(new TestConsumer()); - for (int i = 0; i < NUM_PRODUCERS; i++) { - executorService.submit(new TestProducer()); - } - executorService.shutdown(); - executorService.awaitTermination(5, TimeUnit.MINUTES); - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - } - - class TestProducer implements Runnable { - - public void produceMessages() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString()); - connectionFactory.setExceptionListener(exceptionListener); - connectionFactory.setUseAsyncSend(true); - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - String name = new String(new byte[2 * 1024]); - for (int i = 1; i <= NUM_TO_SEND_PER_PRODUCER; i++) { - - TextMessage message = session.createTextMessage(name + "_" + i); - for (int j = 0; j < 100; j++) { - message.setStringProperty("Prop" + j, "" + j); - } - message.setStringProperty("JMSXGroupID", Thread.currentThread().getName() + i); - message.setIntProperty("JMSXGroupSeq", 1); - producer.send(message); - } - - producer.close(); - session.close(); - connection.close(); - } - - @Override - public void run() { - try { - produceMessages(); - } - catch (Exception e) { - e.printStackTrace(); - exceptions.put(Thread.currentThread(), e); - } - } - } - - class TestConsumer implements Runnable { - - private CountDownLatch finishLatch = new CountDownLatch(1); - - public void consume() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString()); - - connectionFactory.setExceptionListener(exceptionListener); - final int totalMessageCount = NUM_TO_SEND_PER_PRODUCER * DESTINATIONS.length * NUM_PRODUCERS; - final AtomicInteger counter = new AtomicInteger(); - final MessageListener listener = new MessageListener() { - @Override - public void onMessage(Message message) { - - if (debug) { - try { - log.info(((TextMessage) message).getText()); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - boolean first = false; - try { - first = message.getBooleanProperty("JMSXGroupFirstForConsumer"); - } - catch (JMSException e) { - e.printStackTrace(); - exceptions.put(Thread.currentThread(), e); - } - assertTrue("Always is first message", first); - if (counter.incrementAndGet() == totalMessageCount) { - log.info("Got all:" + counter.get()); - finishLatch.countDown(); - - } - } - }; - - int consumerCount = DESTINATIONS.length * 100; - Connection[] connections = new Connection[consumerCount]; - - Session[] sessions = new Session[consumerCount]; - MessageConsumer[] consumers = new MessageConsumer[consumerCount]; - - for (int i = 0; i < consumerCount; i++) { - connections[i] = connectionFactory.createConnection(); - connections[i].start(); - - sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); - - consumers[i] = sessions[i].createConsumer(DESTINATIONS[i % DESTINATIONS.length], null); - consumers[i].setMessageListener(listener); - } - - log.info("received " + counter.get() + " messages"); - - assertTrue("got all messages in time", finishLatch.await(4, TimeUnit.MINUTES)); - - log.info("received " + counter.get() + " messages"); - - for (MessageConsumer consumer : consumers) { - consumer.close(); - } - - for (Session session : sessions) { - session.close(); - } - - for (Connection connection : connections) { - connection.close(); - } - } - - @Override - public void run() { - try { - consume(); - } - catch (Exception e) { - e.printStackTrace(); - exceptions.put(Thread.currentThread(), e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java deleted file mode 100644 index b87fd1b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.Assert; - -public class AMQ4116Test extends EmbeddedBrokerTestSupport { - - private final String tcpAddr = "tcp://localhost:0"; - private String connectionUri; - - /** - * In this test, a message is produced and consumed from the test queue. - * Memory usage on the test queue should be reset to 0. The memory that was - * consumed is then sent to a second queue. Memory usage on the original - * test queue should remain 0, but actually increased when the second - * enqueue occurs. - */ - public void testVMTransport() throws Exception { - runTest(connectionFactory); - } - - /** - * This is an analog to the previous test, but occurs over TCP and passes. - */ - public void testTCPTransport() throws Exception { - runTest(new ActiveMQConnectionFactory(connectionUri)); - } - - private void runTest(ConnectionFactory connFactory) throws Exception { - // Verify that test queue is empty and not using any memory. - Destination physicalDestination = broker.getDestination(destination); - Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); - - // Enqueue a single message and verify that the test queue is using - // memory. - Connection conn = connFactory.createConnection(); - conn.start(); - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = session.createProducer(destination); - - producer.send(new ActiveMQMessage()); - - // Commit, which ensures message is in queue and memory usage updated. - session.commit(); - Assert.assertTrue(physicalDestination.getMemoryUsage().getUsage() > 0); - - // Consume the message and verify that the test queue is no longer using - // any memory. - MessageConsumer consumer = session.createConsumer(destination); - Message received = consumer.receive(); - Assert.assertNotNull(received); - - // Commit, which ensures message is removed from queue and memory usage - // updated. - session.commit(); - Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); - - // Resend the message to a different queue and verify that the original - // test queue is still not using any memory. - ActiveMQQueue secondDestination = new ActiveMQQueue(AMQ4116Test.class + ".second"); - MessageProducer secondPproducer = session.createProducer(secondDestination); - - secondPproducer.send(received); - - // Commit, which ensures message is in queue and memory usage updated. - // NOTE: This assertion fails due to bug. - session.commit(); - Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); - - conn.stop(); - } - - /** - * Create an embedded broker that has both TCP and VM connectors. - */ - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = super.createBroker(); - connectionUri = broker.addConnector(tcpAddr).getPublishableConnectString(); - return broker; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java deleted file mode 100644 index d47c7c8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.net.Socket; -import java.net.URI; - -import javax.management.ObjectName; -import javax.net.SocketFactory; -import javax.net.ssl.SSLSocketFactory; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQSslConnectionFactory; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.transport.stomp.Stomp; -import org.apache.activemq.transport.stomp.StompConnection; -import org.apache.activemq.transport.stomp.StompFrame; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * - */ -public class AMQ4126Test { - - protected BrokerService broker; - - protected String java_security_auth_login_config = "java.security.auth.login.config"; - protected String xbean = "xbean:"; - protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126"; - protected String certBase = "src/test/resources/org/apache/activemq/security"; - protected String JaasStompSSLBroker_xml = "JaasStompSSLBroker.xml"; - protected StompConnection stompConnection = new StompConnection(); - private final static String destinationName = "TEST.QUEUE"; - protected String oldLoginConf = null; - - @Before - public void before() throws Exception { - if (System.getProperty(java_security_auth_login_config) != null) { - oldLoginConf = System.getProperty(java_security_auth_login_config); - } - System.setProperty(java_security_auth_login_config, confBase + "/login.config"); - broker = BrokerFactory.createBroker(xbean + confBase + "/" + JaasStompSSLBroker_xml); - - broker.setDeleteAllMessagesOnStartup(true); - broker.setUseJmx(true); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void after() throws Exception { - broker.stop(); - - if (oldLoginConf != null) { - System.setProperty(java_security_auth_login_config, oldLoginConf); - } - } - - public Socket createSocket(String host, int port) throws Exception { - System.setProperty("javax.net.ssl.trustStore", certBase + "/broker1.ks"); - System.setProperty("javax.net.ssl.trustStorePassword", "password"); - System.setProperty("javax.net.ssl.trustStoreType", "jks"); - System.setProperty("javax.net.ssl.keyStore", certBase + "/client.ks"); - System.setProperty("javax.net.ssl.keyStorePassword", "password"); - System.setProperty("javax.net.ssl.keyStoreType", "jks"); - - SocketFactory factory = SSLSocketFactory.getDefault(); - return factory.createSocket(host, port); - } - - public void stompConnectTo(String connectorName, String extraHeaders) throws Exception { - String host = broker.getConnectorByName(connectorName).getConnectUri().getHost(); - int port = broker.getConnectorByName(connectorName).getConnectUri().getPort(); - stompConnection.open(createSocket(host, port)); - String extra = extraHeaders != null ? extraHeaders : "\n"; - stompConnection.sendFrame("CONNECT\n" + extra + "\n" + Stomp.NULL); - - StompFrame f = stompConnection.receive(); - TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction()); - stompConnection.close(); - } - - @Test - public void testStompSSLWithUsernameAndPassword() throws Exception { - stompConnectTo("stomp+ssl", "login:system\n" + "passcode:manager\n"); - } - - @Test - public void testStompSSLWithCertificate() throws Exception { - stompConnectTo("stomp+ssl", null); - } - - @Test - public void testStompNIOSSLWithUsernameAndPassword() throws Exception { - stompConnectTo("stomp+nio+ssl", "login:system\n" + "passcode:manager\n"); - } - - @Test - public void testStompNIOSSLWithCertificate() throws Exception { - stompConnectTo("stomp+nio+ssl", null); - } - - public void openwireConnectTo(String connectorName, String username, String password) throws Exception { - URI brokerURI = broker.getConnectorByName(connectorName).getConnectUri(); - String uri = "ssl://" + brokerURI.getHost() + ":" + brokerURI.getPort(); - ActiveMQSslConnectionFactory cf = new ActiveMQSslConnectionFactory(uri); - cf.setTrustStore("org/apache/activemq/security/broker1.ks"); - cf.setTrustStorePassword("password"); - cf.setKeyStore("org/apache/activemq/security/client.ks"); - cf.setKeyStorePassword("password"); - ActiveMQConnection connection = null; - if (username != null || password != null) { - connection = (ActiveMQConnection) cf.createConnection(username, password); - } - else { - connection = (ActiveMQConnection) cf.createConnection(); - } - TestCase.assertNotNull(connection); - connection.start(); - connection.stop(); - } - - @Test - public void testOpenwireSSLWithUsernameAndPassword() throws Exception { - openwireConnectTo("openwire+ssl", "system", "manager"); - } - - @Test - public void testOpenwireSSLWithCertificate() throws Exception { - openwireConnectTo("openwire+ssl", null, null); - } - - @Test - public void testOpenwireNIOSSLWithUsernameAndPassword() throws Exception { - openwireConnectTo("openwire+nio+ssl", "system", "mmanager"); - } - - @Test - public void testOpenwireNIOSSLWithCertificate() throws Exception { - openwireConnectTo("openwire+nio+ssl", null, null); - } - - @Test - public void testJmx() throws Exception { - TestCase.assertFalse(findDestination(destinationName)); - broker.getAdminView().addQueue(destinationName); - TestCase.assertTrue(findDestination(destinationName)); - broker.getAdminView().removeQueue(destinationName); - TestCase.assertFalse(findDestination(destinationName)); - } - - private boolean findDestination(String name) throws Exception { - ObjectName[] destinations = broker.getAdminView().getQueues(); - for (ObjectName destination : destinations) { - if (destination.toString().contains(name)) { - return true; - } - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java deleted file mode 100644 index 123413f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.net.Socket; - -import javax.net.SocketFactory; -import javax.net.ssl.SSLSocketFactory; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.transport.stomp.Stomp; -import org.apache.activemq.transport.stomp.StompConnection; -import org.apache.activemq.transport.stomp.StompFrame; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4133Test { - - protected String java_security_auth_login_config = "java.security.auth.login.config"; - protected String xbean = "xbean:"; - protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126"; - protected String certBase = "src/test/resources/org/apache/activemq/security"; - protected String activemqXml = "InconsistentConnectorPropertiesBehaviour.xml"; - protected BrokerService broker; - - protected String oldLoginConf = null; - - @Before - public void before() throws Exception { - if (System.getProperty(java_security_auth_login_config) != null) { - oldLoginConf = System.getProperty(java_security_auth_login_config); - } - System.setProperty(java_security_auth_login_config, confBase + "/" + "login.config"); - broker = BrokerFactory.createBroker(xbean + confBase + "/" + activemqXml); - - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void after() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test - public void stompSSLTransportNeedClientAuthTrue() throws Exception { - stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl").getConnectUri().getPort()); - } - - @Test - public void stompSSLNeedClientAuthTrue() throws Exception { - stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl+special").getConnectUri().getPort()); - } - - @Test - public void stompNIOSSLTransportNeedClientAuthTrue() throws Exception { - stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl").getConnectUri().getPort()); - } - - @Test - public void stompNIOSSLNeedClientAuthTrue() throws Exception { - stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl+special").getConnectUri().getPort()); - } - - public Socket createSocket(String host, int port) throws Exception { - System.setProperty("javax.net.ssl.trustStore", certBase + "/" + "broker1.ks"); - System.setProperty("javax.net.ssl.trustStorePassword", "password"); - System.setProperty("javax.net.ssl.trustStoreType", "jks"); - System.setProperty("javax.net.ssl.keyStore", certBase + "/" + "client.ks"); - System.setProperty("javax.net.ssl.keyStorePassword", "password"); - System.setProperty("javax.net.ssl.keyStoreType", "jks"); - - SocketFactory factory = SSLSocketFactory.getDefault(); - return factory.createSocket(host, port); - } - - public void stompConnectTo(String host, int port) throws Exception { - StompConnection stompConnection = new StompConnection(); - stompConnection.open(createSocket(host, port)); - stompConnection.sendFrame("CONNECT\n" + "\n" + Stomp.NULL); - StompFrame f = stompConnection.receive(); - TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction()); - stompConnection.close(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java deleted file mode 100644 index d0096f1..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.net.URI; -import java.util.concurrent.Semaphore; - -import javax.jms.Message; -import javax.jms.MessageListener; - -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.network.DemandForwardingBridgeSupport; -import org.apache.activemq.util.MessageIdList; -import org.apache.activemq.util.Wait; - -/** - * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} when - * bridges are VM-to-VM. Specifically, memory usage from the local broker is - * manipulated by the remote broker. - */ -public class AMQ4147Test extends JmsMultipleBrokersTestSupport { - - /** - * This test demonstrates the bug: namely, when a message is bridged over - * the VMTransport, its memory usage continues to refer to the originating - * broker. As a result, memory usage is never accounted for on the remote - * broker, and the local broker's memory usage is only decreased once the - * message is consumed on the remote broker. - */ - public void testVMTransportRemoteMemoryUsage() throws Exception { - BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false")); - - BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false")); - - startAllBrokers(); - - // Forward messages from broker1 to broker2 over the VM transport. - bridgeBrokers("broker1", "broker2").start(); - - // Verify that broker1 and broker2's test queues have no memory usage. - ActiveMQDestination testQueue = createDestination(AMQ4147Test.class.getSimpleName() + ".queue", false); - final Destination broker1TestQueue = broker1.getDestination(testQueue); - final Destination broker2TestQueue = broker2.getDestination(testQueue); - - assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage()); - assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); - - // Produce a message to broker1's test queue and verify that broker1's - // memory usage has increased, but broker2 still has no memory usage. - sendMessages("broker1", testQueue, 1); - assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0); - assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); - - // Create a consumer on broker2 that is synchronized to allow detection - // of "in flight" messages to the consumer. - MessageIdList broker2Messages = getBrokerMessages("broker2"); - final Semaphore consumerReady = new Semaphore(0); - final Semaphore consumerProceed = new Semaphore(0); - - broker2Messages.setParent(new MessageListener() { - @Override - public void onMessage(Message message) { - consumerReady.release(); - try { - consumerProceed.acquire(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - }); - - createConsumer("broker2", testQueue); - - // Verify that when broker2's consumer receives the message, the memory - // usage has moved broker1 to broker2. The first assertion is expected - // to fail due to the bug; the try/finally ensures the consumer is - // released prior to failure so that the broker can shut down. - consumerReady.acquire(); - - try { - assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker1TestQueue.getMemoryUsage().getUsage() == 0; - } - })); - assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0); - } - finally { - // Consume the message and verify that there is no more memory - // usage. - consumerProceed.release(); - } - - assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker1TestQueue.getMemoryUsage().getUsage() == 0; - } - })); - assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker2TestQueue.getMemoryUsage().getUsage() == 0; - } - })); - } - - /** - * This test demonstrates that the bug is VMTransport-specific and does not - * occur when bridges occur using other protocols. - */ - public void testTcpTransportRemoteMemoryUsage() throws Exception { - BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false")); - - BrokerService broker2 = createBroker(new URI("broker:(tcp://localhost:61616)/broker2?persistent=false")); - - startAllBrokers(); - - // Forward messages from broker1 to broker2 over the TCP transport. - bridgeBrokers("broker1", "broker2").start(); - - // Verify that broker1 and broker2's test queues have no memory usage. - ActiveMQDestination testQueue = createDestination(AMQ4147Test.class.getSimpleName() + ".queue", false); - final Destination broker1TestQueue = broker1.getDestination(testQueue); - final Destination broker2TestQueue = broker2.getDestination(testQueue); - - assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage()); - assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); - - // Produce a message to broker1's test queue and verify that broker1's - // memory usage has increased, but broker2 still has no memory usage. - sendMessages("broker1", testQueue, 1); - assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0); - assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); - - // Create a consumer on broker2 that is synchronized to allow detection - // of "in flight" messages to the consumer. - MessageIdList broker2Messages = getBrokerMessages("broker2"); - final Semaphore consumerReady = new Semaphore(0); - final Semaphore consumerProceed = new Semaphore(0); - - broker2Messages.setParent(new MessageListener() { - @Override - public void onMessage(Message message) { - consumerReady.release(); - try { - consumerProceed.acquire(); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - }); - - createConsumer("broker2", testQueue); - - // Verify that when broker2's consumer receives the message, the memory - // usage has moved broker1 to broker2. - consumerReady.acquire(); - - try { - assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker1TestQueue.getMemoryUsage().getUsage() == 0; - } - })); - assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0); - } - finally { - // Consume the message and verify that there is no more memory - // usage. - consumerProceed.release(); - } - - // Pause to allow ACK to be processed. - assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker1TestQueue.getMemoryUsage().getUsage() == 0; - } - })); - assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker2TestQueue.getMemoryUsage().getUsage() == 0; - } - })); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java deleted file mode 100644 index 8558f48..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.net.URI; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.network.DemandForwardingBridgeSupport; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.util.Wait; -import org.junit.Assert; - -/** - * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby - * a static subscription from broker1 to broker2 is forwarded to broker3 even - * though the network TTL is 1. This results in duplicate subscriptions on - * broker3. - */ -public class AMQ4148Test extends JmsMultipleBrokersTestSupport { - - public void test() throws Exception { - // Create a hub-and-spoke network where each hub-spoke pair share - // messages on a test queue. - BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false")); - - final BrokerService[] spokes = new BrokerService[4]; - for (int i = 0; i < spokes.length; i++) { - spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false")); - - } - startAllBrokers(); - - ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName() + ".queue", false); - - NetworkConnector[] ncs = new NetworkConnector[spokes.length]; - for (int i = 0; i < spokes.length; i++) { - NetworkConnector nc = bridgeBrokers("hub", "spoke" + i); - nc.setNetworkTTL(1); - nc.setDuplex(true); - nc.setConduitSubscriptions(false); - nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue)); - nc.start(); - - ncs[i] = nc; - } - - waitForBridgeFormation(); - - // Pause to allow subscriptions to be created. - TimeUnit.SECONDS.sleep(5); - - // Verify that the hub has a subscription from each spoke, but that each - // spoke has a single subscription from the hub (since the network TTL is 1). - final Destination hubTestQueue = hub.getDestination(testQueue); - assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size() + "}", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return spokes.length == hubTestQueue.getConsumers().size(); - } - })); - - // Now check each spoke has exactly one consumer on the Queue. - for (int i = 0; i < 4; i++) { - Destination spokeTestQueue = spokes[i].getDestination(testQueue); - Assert.assertEquals(1, spokeTestQueue.getConsumers().size()); - } - - for (NetworkConnector nc : ncs) { - nc.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java deleted file mode 100644 index f932a49..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionControl; -import org.junit.After; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4157Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ4157Test.class); - private BrokerService broker; - private ActiveMQConnectionFactory connectionFactory; - private final Destination destination = new ActiveMQQueue("Test"); - private final String payloadString = new String(new byte[8 * 1024]); - private final boolean useBytesMessage = true; - private final int parallelProducer = 20; - private final int parallelConsumer = 100; - - private final Vector exceptions = new Vector<>(); - long toSend = 1000; - - @Test - public void testPublishCountsWithRollbackConsumer() throws Exception { - - startBroker(true); - - final AtomicLong sharedCount = new AtomicLong(toSend); - ExecutorService executorService = Executors.newCachedThreadPool(); - - for (int i = 0; i < parallelConsumer; i++) { - executorService.execute(new Runnable() { - @Override - public void run() { - try { - consumeOneAndRollback(); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - } - - for (int i = 0; i < parallelProducer; i++) { - executorService.execute(new Runnable() { - @Override - public void run() { - try { - publishMessages(sharedCount, 0); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - } - - executorService.shutdown(); - executorService.awaitTermination(30, TimeUnit.MINUTES); - assertTrue("Producers done in time", executorService.isTerminated()); - assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); - - restartBroker(500); - - LOG.info("Attempting consume of {} messages", toSend); - - consumeMessages(toSend); - } - - private void consumeOneAndRollback() throws Exception { - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(destination); - Message message = null; - while (message == null) { - message = consumer.receive(1000); - } - session.rollback(); - connection.close(); - } - - private void consumeMessages(long count) throws Exception { - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - for (int i = 0; i < count; i++) { - assertNotNull("got message " + i, consumer.receive(20000)); - } - assertNull("none left over", consumer.receive(2000)); - } - - private void restartBroker(int restartDelay) throws Exception { - stopBroker(); - TimeUnit.MILLISECONDS.sleep(restartDelay); - startBroker(false); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - private void publishMessages(AtomicLong count, int expiry) throws Exception { - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setWatchTopicAdvisories(false); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = session.createProducer(destination); - while ((count.getAndDecrement()) > 0) { - Message message = null; - if (useBytesMessage) { - message = session.createBytesMessage(); - ((BytesMessage) message).writeBytes(payloadString.getBytes()); - } - else { - message = session.createTextMessage(payloadString); - } - producer.send(message, DeliveryMode.PERSISTENT, 5, expiry); - } - connection.syncSendPacket(new ConnectionControl()); - connection.close(); - } - - public void startBroker(boolean deleteAllMessages) throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(deleteAllMessages); - broker.addConnector("tcp://0.0.0.0:0"); - broker.start(); - - String options = "?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.prefetchPolicy.all=1000&jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192"; - connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java deleted file mode 100644 index 0cd8e0b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java +++ /dev/null @@ -1,380 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.management.ObjectName; - -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerFilter; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.DiscoveryEvent; -import org.apache.activemq.network.DiscoveryNetworkConnector; -import org.apache.activemq.network.NetworkBridge; -import org.apache.activemq.network.NetworkBridgeListener; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.discovery.DiscoveryAgent; -import org.apache.activemq.transport.discovery.DiscoveryListener; -import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; -import org.junit.Assert; - -/** - * This test demonstrates a number of race conditions in - * {@link DiscoveryNetworkConnector} that can result in an active bridge no - * longer being reported as active and vice-versa, an inactive bridge still - * being reported as active. - */ -public class AMQ4160Test extends JmsMultipleBrokersTestSupport { - - final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2); - - /** - * Since these tests involve wait conditions, protect against indefinite - * waits (due to unanticipated issues). - */ - @Override - public void setUp() throws Exception { - setAutoFail(true); - setMaxTestTime(MAX_TEST_TIME); - super.setUp(); - } - - /** - * This test demonstrates how concurrent attempts to establish a bridge to - * the same remote broker are allowed to occur. Connection uniqueness will - * cause whichever bridge creation attempt is second to fail. However, this - * failure erases the entry in - * {@link DiscoveryNetworkConnector#activeBridges()} that represents the - * successful first bridge creation attempt. - */ - public void testLostActiveBridge() throws Exception { - final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15); - - // Start two brokers with a bridge from broker1 to broker2. - BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false")); - final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false")); - - // Allow the concurrent local bridge connections to be made even though - // they are duplicated; this prevents both of the bridge attempts from - // failing in the case that the local and remote bridges are established - // out-of-order. - BrokerPlugin ignoreAddConnectionPlugin = new BrokerPlugin() { - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new BrokerFilter(broker) { - @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - // ignore - } - }; - } - }; - - broker1.setPlugins(new BrokerPlugin[]{ignoreAddConnectionPlugin}); - - startAllBrokers(); - - // Start a bridge from broker1 to broker2. The discovery agent attempts - // to create the bridge concurrently with two threads, and the - // synchronization in createBridge ensures that pre-patch both threads - // actually attempt to start bridges. Post-patch, only one thread is - // allowed to start the bridge. - final CountDownLatch attemptLatch = new CountDownLatch(2); - final CountDownLatch createLatch = new CountDownLatch(2); - - DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() { - @Override - public void onServiceAdd(DiscoveryEvent event) { - // Pre-and-post patch, two threads attempt to establish a bridge - // to the same remote broker. - attemptLatch.countDown(); - super.onServiceAdd(event); - } - - @Override - protected NetworkBridge createBridge(Transport localTransport, - Transport remoteTransport, - final DiscoveryEvent event) { - // Pre-patch, the two threads are allowed to create the bridge. - // Post-patch, only the first thread is allowed. Wait a - // reasonable delay once both attempts are detected to allow - // the two bridge creations to occur concurrently (pre-patch). - // Post-patch, the wait will timeout and allow the first (and - // only) bridge creation to occur. - try { - attemptLatch.await(); - createLatch.countDown(); - createLatch.await(ATTEMPT_TO_CREATE_DELAY, TimeUnit.MILLISECONDS); - return super.createBridge(localTransport, remoteTransport, event); - } - catch (InterruptedException e) { - Thread.interrupted(); - return null; - } - } - }; - - nc.setDiscoveryAgent(new DiscoveryAgent() { - TaskRunnerFactory taskRunner = new TaskRunnerFactory(); - DiscoveryListener listener; - - @Override - public void start() throws Exception { - taskRunner.init(); - taskRunner.execute(new Runnable() { - @Override - public void run() { - listener.onServiceAdd(new DiscoveryEvent(broker2.getVmConnectorURI().toString())); - } - }); - taskRunner.execute(new Runnable() { - @Override - public void run() { - listener.onServiceAdd(new DiscoveryEvent(broker2.getVmConnectorURI().toString())); - } - }); - } - - @Override - public void stop() throws Exception { - taskRunner.shutdown(); - } - - @Override - public void setDiscoveryListener(DiscoveryListener listener) { - this.listener = listener; - } - - @Override - public void registerService(String name) throws IOException { - } - - @Override - public void serviceFailed(DiscoveryEvent event) throws IOException { - listener.onServiceRemove(event); - } - }); - - broker1.addNetworkConnector(nc); - nc.start(); - - // Wait for the bridge to be formed by the first attempt. - waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS); - - // Pre-patch, the second bridge creation attempt fails and removes the - // first (successful) bridge creation attempt from the - // list of active bridges. Post-patch, the second bridge creation - // attempt is prevented, so the first bridge creation attempt - // remains "active". This assertion is expected to fail pre-patch and - // pass post-patch. - Assert.assertFalse(nc.activeBridges().isEmpty()); - } - - /** - * This test demonstrates a race condition where a failed bridge can be - * removed from the list of active bridges in - * {@link DiscoveryNetworkConnector} before it has been added. Eventually, - * the failed bridge is added, but never removed, which causes subsequent - * bridge creation attempts to be ignored. The result is a network connector - * that thinks it has an active bridge, when in fact it doesn't. - */ - public void testInactiveBridgStillActive() throws Exception { - // Start two brokers with a bridge from broker1 to broker2. - BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false")); - final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false")); - - // Force bridge failure by having broker1 disallow connections. - BrokerPlugin disallowAddConnectionPlugin = new BrokerPlugin() { - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new BrokerFilter(broker) { - @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - throw new Exception("Test exception to force bridge failure"); - } - }; - } - }; - - broker1.setPlugins(new BrokerPlugin[]{disallowAddConnectionPlugin}); - - startAllBrokers(); - - // Start a bridge from broker1 to broker2. The bridge delays returning - // from start until after the bridge failure has been processed; - // this leaves the first bridge creation attempt recorded as active, - // even though it failed. - final SimpleDiscoveryAgent da = new SimpleDiscoveryAgent(); - da.setServices(new URI[]{broker2.getVmConnectorURI()}); - - final CountDownLatch attemptLatch = new CountDownLatch(3); - final CountDownLatch removedLatch = new CountDownLatch(1); - - DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() { - @Override - public void onServiceAdd(DiscoveryEvent event) { - attemptLatch.countDown(); - super.onServiceAdd(event); - } - - @Override - public void onServiceRemove(DiscoveryEvent event) { - super.onServiceRemove(event); - removedLatch.countDown(); - } - - @Override - protected NetworkBridge createBridge(Transport localTransport, - Transport remoteTransport, - final DiscoveryEvent event) { - final NetworkBridge next = super.createBridge(localTransport, remoteTransport, event); - return new NetworkBridge() { - - @Override - public void start() throws Exception { - next.start(); - // Delay returning until the failed service has been - // removed. - removedLatch.await(); - } - - @Override - public void stop() throws Exception { - next.stop(); - } - - @Override - public void serviceRemoteException(Throwable error) { - next.serviceRemoteException(error); - } - - @Override - public void serviceLocalException(Throwable error) { - next.serviceLocalException(error); - } - - @Override - public void setNetworkBridgeListener(NetworkBridgeListener listener) { - next.setNetworkBridgeListener(listener); - } - - @Override - public String getRemoteAddress() { - return next.getRemoteAddress(); - } - - @Override - public String getRemoteBrokerName() { - return next.getRemoteBrokerName(); - } - - @Override - public String getRemoteBrokerId() { - return next.getRemoteBrokerId(); - } - - @Override - public String getLocalAddress() { - return next.getLocalAddress(); - } - - @Override - public String getLocalBrokerName() { - return next.getLocalBrokerName(); - } - - @Override - public long getEnqueueCounter() { - return next.getEnqueueCounter(); - } - - @Override - public long getDequeueCounter() { - return next.getDequeueCounter(); - } - - @Override - public void setMbeanObjectName(ObjectName objectName) { - next.setMbeanObjectName(objectName); - } - - @Override - public ObjectName getMbeanObjectName() { - return next.getMbeanObjectName(); - } - - @Override - public void resetStats() { - next.resetStats(); - } - }; - } - }; - nc.setDiscoveryAgent(da); - - broker1.addNetworkConnector(nc); - nc.start(); - - // All bridge attempts should fail, so the attempt latch should get - // triggered. However, because of the race condition, the first attempt - // is considered successful and causes further attempts to stop. - // Therefore, this wait will time out and cause the test to fail. - Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS)); - } - - /** - * This test verifies that when a network connector is restarted, any - * bridges that were active at the time of the stop are allowed to be - * re-established (i.e., the "active events" data structure in - * {@link DiscoveryNetworkConnector} is reset. - */ - public void testAllowAttemptsAfterRestart() throws Exception { - final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10); - - // Start two brokers with a bridge from broker1 to broker2. - BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false")); - final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false")); - - startAllBrokers(); - - // Start a bridge from broker1 to broker2. - NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(), broker2.getBrokerName()); - nc.start(); - - waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS); - - // Restart the network connector and verify that the bridge is - // re-established. The pause between start/stop is to account for the - // asynchronous closure. - nc.stop(); - Thread.sleep(STOP_DELAY); - nc.start(); - - waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS); - } -}