Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 14E351848C for ; Mon, 8 Feb 2016 17:10:53 +0000 (UTC) Received: (qmail 58899 invoked by uid 500); 8 Feb 2016 17:10:49 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 58804 invoked by uid 500); 8 Feb 2016 17:10:49 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 58562 invoked by uid 99); 8 Feb 2016 17:10:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Feb 2016 17:10:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2CF69E0994; Mon, 8 Feb 2016 17:10:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 08 Feb 2016 17:11:11 -0000 Message-Id: In-Reply-To: <88c9a5b8d99a45e0883141fe04dc7e27@git.apache.org> References: <88c9a5b8d99a45e0883141fe04dc7e27@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/47] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java deleted file mode 100644 index a70ef67..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ /dev/null @@ -1,584 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; -import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract public class MessagePriorityTest extends CombinationTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(MessagePriorityTest.class); - - BrokerService broker; - PersistenceAdapter adapter; - - protected ActiveMQConnectionFactory factory; - protected Connection conn; - protected Session sess; - - public boolean useCache = true; - public int deliveryMode = Message.DEFAULT_DELIVERY_MODE; - public boolean dispatchAsync = true; - public boolean prioritizeMessages = true; - public boolean immediatePriorityDispatch = true; - public int prefetchVal = 500; - public int expireMessagePeriod = 30000; - - public int MSG_NUM = 600; - public int HIGH_PRI = 7; - public int LOW_PRI = 3; - - abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception; - - @Override - protected void setUp() throws Exception { - broker = new BrokerService(); - broker.setBrokerName("priorityTest"); - broker.setAdvisorySupport(false); - adapter = createPersistenceAdapter(true); - broker.setPersistenceAdapter(adapter); - PolicyEntry policy = new PolicyEntry(); - policy.setPrioritizedMessages(prioritizeMessages); - policy.setUseCache(useCache); - policy.setExpireMessagesPeriod(expireMessagePeriod); - StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = new StorePendingDurableSubscriberMessageStoragePolicy(); - durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch); - durableSubPending.setUseCache(useCache); - policy.setPendingDurableSubscriberPolicy(durableSubPending); - PolicyMap policyMap = new PolicyMap(); - policyMap.put(new ActiveMQQueue("TEST"), policy); - policyMap.put(new ActiveMQTopic("TEST"), policy); - - // do not process expired for one test - PolicyEntry ignoreExpired = new PolicyEntry(); - SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy(); - ignoreExpiredStrategy.setProcessExpired(false); - ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy); - policyMap.put(new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), ignoreExpired); - - broker.setDestinationPolicy(policyMap); - broker.start(); - broker.waitUntilStarted(); - - factory = new ActiveMQConnectionFactory("vm://priorityTest"); - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setAll(prefetchVal); - factory.setPrefetchPolicy(prefetch); - factory.setWatchTopicAdvisories(false); - factory.setDispatchAsync(dispatchAsync); - conn = factory.createConnection(); - conn.setClientID("priority"); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - @Override - protected void tearDown() throws Exception { - try { - sess.close(); - conn.close(); - } - catch (Exception ignored) { - } - finally { - broker.stop(); - broker.waitUntilStopped(); - } - } - - public void testStoreConfigured() throws Exception { - final Queue queue = sess.createQueue("TEST"); - final Topic topic = sess.createTopic("TEST"); - - MessageProducer queueProducer = sess.createProducer(queue); - MessageProducer topicProducer = sess.createProducer(topic); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker.getRegionBroker().getDestinationMap().get(queue) != null; - } - }); - assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages()); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker.getRegionBroker().getDestinationMap().get(topic) != null; - } - }); - assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages()); - - queueProducer.close(); - topicProducer.close(); - - } - - protected class ProducerThread extends Thread { - - int priority; - int messageCount; - ActiveMQDestination dest; - - public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) { - this.messageCount = messageCount; - this.priority = priority; - this.dest = dest; - } - - @Override - public void run() { - try { - MessageProducer producer = sess.createProducer(dest); - producer.setPriority(priority); - producer.setDeliveryMode(deliveryMode); - for (int i = 0; i < messageCount; i++) { - producer.send(sess.createTextMessage("message priority: " + priority)); - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - - public void setMessagePriority(int priority) { - this.priority = priority; - } - - public void setMessageCount(int messageCount) { - this.messageCount = messageCount; - } - - } - - public void initCombosForTestQueues() { - addCombinationValues("useCache", new Object[]{new Boolean(true), new Boolean(false)}); - addCombinationValues("deliveryMode", new Object[]{new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)}); - } - - public void testQueues() throws Exception { - ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST"); - - ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI); - ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI); - - lowPri.start(); - highPri.start(); - - lowPri.join(); - highPri.join(); - - MessageConsumer queueConsumer = sess.createConsumer(queue); - for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = queueConsumer.receive(5000); - LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null)); - assertNotNull("Message " + i + " was null", msg); - assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); - } - } - - protected Message createMessage(int priority) throws Exception { - final String text = "priority " + priority; - Message msg = sess.createTextMessage(text); - LOG.info("Sending " + text); - return msg; - } - - public void initCombosForTestDurableSubs() { - addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(MSG_NUM / 4)}); - } - - public void testDurableSubs() throws Exception { - ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); - TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority"); - sub.close(); - - ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI); - ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI); - - lowPri.start(); - highPri.start(); - - lowPri.join(); - highPri.join(); - - sub = sess.createDurableSubscriber(topic, "priority"); - for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = sub.receive(5000); - assertNotNull("Message " + i + " was null", msg); - assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); - } - - // verify that same broker/store can deal with non priority dest also - topic = (ActiveMQTopic) sess.createTopic("HAS_NO_PRIORITY"); - sub = sess.createDurableSubscriber(topic, "no_priority"); - sub.close(); - - lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI); - highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI); - - lowPri.start(); - highPri.start(); - - lowPri.join(); - highPri.join(); - - sub = sess.createDurableSubscriber(topic, "no_priority"); - // verify we got them all - for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = sub.receive(5000); - assertNotNull("Message " + i + " was null", msg); - } - - } - - public void initCombosForTestDurableSubsReconnect() { - addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(MSG_NUM / 2)}); - // REVISIT = is dispatchAsync = true a problem or is it just the test? - addCombinationValues("dispatchAsync", new Object[]{Boolean.FALSE}); - addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE}); - } - - public void testDurableSubsReconnect() throws Exception { - ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); - final String subName = "priorityDisconnect"; - TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); - sub.close(); - - ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI); - ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI); - - lowPri.start(); - highPri.start(); - - lowPri.join(); - highPri.join(); - - final int closeFrequency = MSG_NUM / 4; - sub = sess.createDurableSubscriber(topic, subName); - for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = sub.receive(15000); - LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null)); - assertNotNull("Message " + i + " was null", msg); - assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); - if (i > 0 && i % closeFrequency == 0) { - LOG.info("Closing durable sub.. on: " + i); - sub.close(); - sub = sess.createDurableSubscriber(topic, subName); - } - } - } - - public void testHighPriorityDelivery() throws Exception { - - // get zero prefetch - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setAll(0); - factory.setPrefetchPolicy(prefetch); - conn.close(); - conn = factory.createConnection(); - conn.setClientID("priority"); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); - final String subName = "priorityDisconnect"; - TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); - sub.close(); - - final int numToProduce = 2000; - final int[] dups = new int[numToProduce * 2]; - ProducerThread producerThread = new ProducerThread(topic, numToProduce, LOW_PRI + 1); - producerThread.run(); - LOG.info("Low priority messages sent"); - - sub = sess.createDurableSubscriber(topic, subName); - final int batchSize = 250; - int lowLowCount = 0; - for (int i = 0; i < numToProduce; i++) { - Message msg = sub.receive(15000); - LOG.info("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority() : null)); - assertNotNull("Message " + i + " was null", msg); - assertEquals("Message " + i + " has wrong priority", LOW_PRI + 1, msg.getJMSPriority()); - assertTrue("not duplicate ", dups[i] == 0); - dups[i] = 1; - - if (i % batchSize == 0) { - producerThread.setMessagePriority(HIGH_PRI); - producerThread.setMessageCount(1); - producerThread.run(); - LOG.info("High priority message sent, should be able to receive immediately"); - - if (i % batchSize * 2 == 0) { - producerThread.setMessagePriority(HIGH_PRI - 1); - producerThread.setMessageCount(1); - producerThread.run(); - LOG.info("High -1 priority message sent, should be able to receive immediately"); - } - - if (i % batchSize * 4 == 0) { - producerThread.setMessagePriority(LOW_PRI); - producerThread.setMessageCount(1); - producerThread.run(); - lowLowCount++; - LOG.info("Low low priority message sent, should not be able to receive immediately"); - } - - msg = sub.receive(15000); - assertNotNull("Message was null", msg); - LOG.info("received hi? : " + msg); - assertEquals("high priority", HIGH_PRI, msg.getJMSPriority()); - - if (i % batchSize * 2 == 0) { - msg = sub.receive(15000); - assertNotNull("Message was null", msg); - LOG.info("received hi -1 ? i=" + i + ", " + msg); - assertEquals("high priority", HIGH_PRI - 1, msg.getJMSPriority()); - } - } - } - for (int i = 0; i < lowLowCount; i++) { - Message msg = sub.receive(15000); - LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null)); - assertNotNull("Message " + i + " was null", msg); - assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority()); - } - } - - public void initCombosForTestHighPriorityDeliveryInterleaved() { - addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE}); - } - - public void testHighPriorityDeliveryInterleaved() throws Exception { - - // get zero prefetch - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setAll(0); - factory.setPrefetchPolicy(prefetch); - conn.close(); - conn = factory.createConnection(); - conn.setClientID("priority"); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); - final String subName = "priorityDisconnect"; - TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); - sub.close(); - - ProducerThread producerThread = new ProducerThread(topic, 1, HIGH_PRI); - producerThread.run(); - - producerThread.setMessagePriority(HIGH_PRI - 1); - producerThread.setMessageCount(1); - producerThread.run(); - - producerThread.setMessagePriority(LOW_PRI); - producerThread.setMessageCount(1); - producerThread.run(); - LOG.info("Ordered priority messages sent"); - - sub = sess.createDurableSubscriber(topic, subName); - - Message msg = sub.receive(15000); - assertNotNull("Message was null", msg); - LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority()); - assertEquals("Message has wrong priority", HIGH_PRI, msg.getJMSPriority()); - - producerThread.setMessagePriority(LOW_PRI + 1); - producerThread.setMessageCount(1); - producerThread.run(); - - msg = sub.receive(15000); - assertNotNull("Message was null", msg); - LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority()); - assertEquals("high priority", HIGH_PRI - 1, msg.getJMSPriority()); - - msg = sub.receive(15000); - assertNotNull("Message was null", msg); - LOG.info("received hi? : " + msg); - assertEquals("high priority", LOW_PRI + 1, msg.getJMSPriority()); - - msg = sub.receive(15000); - assertNotNull("Message was null", msg); - LOG.info("received hi? : " + msg); - assertEquals("high priority", LOW_PRI, msg.getJMSPriority()); - - msg = sub.receive(4000); - assertNull("Message was null", msg); - } - - // immediatePriorityDispatch is only relevant when cache is exhausted - public void initCombosForTestHighPriorityDeliveryThroughBackLog() { - addCombinationValues("useCache", new Object[]{Boolean.FALSE}); - addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.TRUE}); - } - - public void testHighPriorityDeliveryThroughBackLog() throws Exception { - - // get zero prefetch - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setAll(0); - factory.setPrefetchPolicy(prefetch); - conn.close(); - conn = factory.createConnection(); - conn.setClientID("priority"); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); - final String subName = "priorityDisconnect"; - TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); - sub.close(); - - ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI); - producerThread.run(); - - sub = sess.createDurableSubscriber(topic, subName); - int count = 0; - - for (; count < 300; count++) { - Message msg = sub.receive(15000); - assertNotNull("Message was null", msg); - assertEquals("high priority", LOW_PRI, msg.getJMSPriority()); - } - - producerThread.setMessagePriority(HIGH_PRI); - producerThread.setMessageCount(1); - producerThread.run(); - - Message msg = sub.receive(15000); - assertNotNull("Message was null", msg); - assertEquals("high priority", HIGH_PRI, msg.getJMSPriority()); - - for (; count < 600; count++) { - msg = sub.receive(15000); - assertNotNull("Message was null", msg); - assertEquals("high priority", LOW_PRI, msg.getJMSPriority()); - } - } - - public void initCombosForTestHighPriorityNonDeliveryThroughBackLog() { - addCombinationValues("useCache", new Object[]{Boolean.FALSE}); - addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.FALSE}); - } - - public void testHighPriorityNonDeliveryThroughBackLog() throws Exception { - - // get zero prefetch - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setAll(0); - factory.setPrefetchPolicy(prefetch); - conn.close(); - conn = factory.createConnection(); - conn.setClientID("priority"); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); - final String subName = "priorityDisconnect"; - TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); - sub.close(); - - ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI); - producerThread.run(); - - sub = sess.createDurableSubscriber(topic, subName); - int count = 0; - - for (; count < 300; count++) { - Message msg = sub.receive(15000); - assertNotNull("Message was null", msg); - assertEquals("high priority", LOW_PRI, msg.getJMSPriority()); - } - - producerThread.setMessagePriority(HIGH_PRI); - producerThread.setMessageCount(1); - producerThread.run(); - - for (; count < 400; count++) { - Message msg = sub.receive(15000); - assertNotNull("Message was null", msg); - assertEquals("high priority", LOW_PRI, msg.getJMSPriority()); - } - - Message msg = sub.receive(15000); - assertNotNull("Message was null", msg); - assertEquals("high priority", HIGH_PRI, msg.getJMSPriority()); - - for (; count < 600; count++) { - msg = sub.receive(15000); - assertNotNull("Message was null", msg); - assertEquals("high priority", LOW_PRI, msg.getJMSPriority()); - } - } - - public void initCombosForTestQueueBacklog() { - // the cache limits the priority ordering to available memory - addCombinationValues("useCache", new Object[]{new Boolean(false)}); - // expiry processing can fill the cursor with a snapshot of the producer - // priority, before producers are complete - addCombinationValues("expireMessagePeriod", new Object[]{new Integer(0)}); - } - - public void testQueueBacklog() throws Exception { - final int backlog = 180000; - ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST"); - - ProducerThread lowPri = new ProducerThread(queue, backlog, LOW_PRI); - ProducerThread highPri = new ProducerThread(queue, 10, HIGH_PRI); - - lowPri.start(); - lowPri.join(); - highPri.start(); - highPri.join(); - - LOG.info("Starting consumer..."); - MessageConsumer queueConsumer = sess.createConsumer(queue); - for (int i = 0; i < 500; i++) { - Message msg = queueConsumer.receive(20000); - LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null)); - if (msg == null) - dumpAllThreads("backlog"); - assertNotNull("Message " + i + " was null", msg); - assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java deleted file mode 100644 index f7ab98d..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -// https://issues.apache.org/activemq/browse/AMQ-2594 -public abstract class StoreOrderTest { - - private static final Logger LOG = LoggerFactory.getLogger(StoreOrderTest.class); - - protected BrokerService broker; - private ActiveMQConnection connection; - public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0"); - - protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception; - - protected void dumpMessages() throws Exception { - } - - public class TransactedSend implements Runnable { - - private CountDownLatch readyForCommit; - private CountDownLatch firstDone; - private boolean first; - private Session session; - private MessageProducer producer; - - public TransactedSend(CountDownLatch readyForCommit, CountDownLatch firstDone, boolean b) throws Exception { - this.readyForCommit = readyForCommit; - this.firstDone = firstDone; - this.first = b; - session = connection.createSession(true, Session.SESSION_TRANSACTED); - producer = session.createProducer(destination); - } - - @Override - public void run() { - try { - if (!first) { - firstDone.await(30, TimeUnit.SECONDS); - } - producer.send(session.createTextMessage(first ? "first" : "second")); - if (first) { - firstDone.countDown(); - } - readyForCommit.countDown(); - - } - catch (Exception e) { - e.printStackTrace(); - fail("unexpected ex on run " + e); - } - } - - public void commit() throws Exception { - session.commit(); - session.close(); - } - } - - @Before - public void setup() throws Exception { - broker = createBroker(); - initConnection(); - } - - public void initConnection() throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setWatchTopicAdvisories(false); - connection.start(); - } - - @After - public void stopBroker() throws Exception { - if (connection != null) { - connection.close(); - } - if (broker != null) { - broker.stop(); - } - } - - @Test - public void testCompositeSendReceiveAfterRestart() throws Exception { - destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest"); - enqueueOneMessage(); - - LOG.info("restart broker"); - stopBroker(); - broker = createRestartedBroker(); - dumpMessages(); - initConnection(); - destination = new ActiveMQQueue("StoreOrderTest"); - assertNotNull("got one message from first dest", receiveOne()); - dumpMessages(); - destination = new ActiveMQQueue("SecondStoreOrderTest"); - assertNotNull("got one message from second dest", receiveOne()); - } - - @Test - public void validateUnorderedTxCommit() throws Exception { - - Executor executor = Executors.newCachedThreadPool(); - CountDownLatch readyForCommit = new CountDownLatch(2); - CountDownLatch firstDone = new CountDownLatch(1); - - TransactedSend first = new TransactedSend(readyForCommit, firstDone, true); - TransactedSend second = new TransactedSend(readyForCommit, firstDone, false); - executor.execute(first); - executor.execute(second); - - assertTrue("both started", readyForCommit.await(20, TimeUnit.SECONDS)); - - LOG.info("commit out of order"); - // send interleaved so sequence id at time of commit could be reversed - second.commit(); - - // force usage over the limit before second commit to flush cache - enqueueOneMessage(); - - // can get lost in the cursor as it is behind the last sequenceId that was cached - first.commit(); - - LOG.info("send/commit done.."); - - dumpMessages(); - - String received1, received2, received3 = null; - if (true) { - LOG.info("receive and rollback..."); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - received1 = receive(session); - received2 = receive(session); - received3 = receive(session); - - assertEquals("second", received1); - assertEquals("middle", received2); - assertEquals("first", received3); - - session.rollback(); - session.close(); - } - - LOG.info("restart broker"); - stopBroker(); - broker = createRestartedBroker(); - initConnection(); - - if (true) { - LOG.info("receive and rollback after restart..."); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - received1 = receive(session); - received2 = receive(session); - received3 = receive(session); - assertEquals("second", received1); - assertEquals("middle", received2); - assertEquals("first", received3); - session.rollback(); - session.close(); - } - - LOG.info("receive and ack each message"); - received1 = receiveOne(); - received2 = receiveOne(); - received3 = receiveOne(); - - assertEquals("second", received1); - assertEquals("middle", received2); - assertEquals("first", received3); - } - - private void enqueueOneMessage() throws Exception { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = session.createProducer(destination); - producer.send(session.createTextMessage("middle")); - session.commit(); - session.close(); - } - - private String receiveOne() throws Exception { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - String received = receive(session); - session.commit(); - session.close(); - return received; - } - - private String receive(Session session) throws Exception { - MessageConsumer consumer = session.createConsumer(destination); - String result = null; - TextMessage message = (TextMessage) consumer.receive(5000); - if (message != null) { - LOG.info("got message: " + message); - result = message.getText(); - } - consumer.close(); - return result; - } - - protected BrokerService createBroker() throws Exception { - boolean deleteMessagesOnStartup = true; - return startBroker(deleteMessagesOnStartup); - } - - protected BrokerService createRestartedBroker() throws Exception { - boolean deleteMessagesOnStartup = false; - return startBroker(deleteMessagesOnStartup); - } - - protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception { - BrokerService newBroker = new BrokerService(); - configureBroker(newBroker); - newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup); - newBroker.start(); - return newBroker; - } - - protected void configureBroker(BrokerService brokerService) throws Exception { - setPersistentAdapter(brokerService); - brokerService.setAdvisorySupport(false); - - PolicyMap map = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setMemoryLimit(1024 * 3); - defaultEntry.setCursorMemoryHighWaterMark(68); - defaultEntry.setExpireMessagesPeriod(0); - map.setDefaultEntry(defaultEntry); - brokerService.setDestinationPolicy(map); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java deleted file mode 100644 index cc144d0..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class StorePerDestinationTest { - - static final Logger LOG = LoggerFactory.getLogger(StorePerDestinationTest.class); - final static int maxFileLength = 1024 * 100; - final static int numToSend = 5000; - final Vector exceptions = new Vector<>(); - BrokerService brokerService; - - protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { - - BrokerService broker = new BrokerService(); - broker.setUseJmx(false); - broker.setPersistenceAdapter(kaha); - return broker; - } - - protected PersistenceAdapter createStore(boolean delete) throws IOException { - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - kaha.setJournalMaxFileLength(maxFileLength); - kaha.setCleanupInterval(5000); - if (delete) { - kaha.deleteAllMessages(); - } - return kaha; - } - - @Before - public void prepareCleanBrokerWithMultiStore() throws Exception { - prepareBrokerWithMultiStore(true); - } - - public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { - - MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); - if (deleteAllMessages) { - multiKahaDBPersistenceAdapter.deleteAllMessages(); - } - ArrayList adapters = new ArrayList<>(); - - FilteredKahaDBPersistenceAdapter theRest = new FilteredKahaDBPersistenceAdapter(); - theRest.setPersistenceAdapter(createStore(deleteAllMessages)); - // default destination when not set is a match for all - adapters.add(theRest); - - // separate store for FastQ - FilteredKahaDBPersistenceAdapter fastQStore = new FilteredKahaDBPersistenceAdapter(); - fastQStore.setPersistenceAdapter(createStore(deleteAllMessages)); - fastQStore.setDestination(new ActiveMQQueue("FastQ")); - adapters.add(fastQStore); - - multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); - brokerService = createBroker(multiKahaDBPersistenceAdapter); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - } - - @Test - public void testTransactedSendReceive() throws Exception { - brokerService.start(); - sendMessages(true, "SlowQ", 1, 0); - assertEquals("got one", 1, receiveMessages(true, "SlowQ", 1)); - } - - @Test - public void testTransactedSendReceiveAcrossStores() throws Exception { - brokerService.start(); - sendMessages(true, "SlowQ,FastQ", 1, 0); - assertEquals("got one", 2, receiveMessages(true, "SlowQ,FastQ", 2)); - } - - @Test - public void testCommitRecovery() throws Exception { - doTestRecovery(true); - } - - @Test - public void testRollbackRecovery() throws Exception { - doTestRecovery(false); - } - - public void doTestRecovery(final boolean haveOutcome) throws Exception { - final MultiKahaDBPersistenceAdapter persistenceAdapter = (MultiKahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); - MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(persistenceAdapter) { - @Override - public void persistOutcome(Tx tx, TransactionId txid) throws IOException { - if (haveOutcome) { - super.persistOutcome(tx, txid); - } - try { - // IOExceptions will stop the broker - persistenceAdapter.stop(); - } - catch (Exception e) { - LOG.error("ex on stop ", e); - exceptions.add(e); - } - } - }; - persistenceAdapter.setTransactionStore(transactionStore); - brokerService.start(); - - ExecutorService executorService = Executors.newCachedThreadPool(); - executorService.execute(new Runnable() { - @Override - public void run() { - try { - // commit will block - sendMessages(true, "SlowQ,FastQ", 1, 0); - } - catch (Exception expected) { - LOG.info("expected", expected); - } - } - }); - - brokerService.waitUntilStopped(); - // interrupt the send thread - executorService.shutdownNow(); - - // verify auto recovery - prepareBrokerWithMultiStore(false); - brokerService.start(); - - assertEquals("expect to get the recovered message", haveOutcome ? 2 : 0, receiveMessages(false, "SlowQ,FastQ", 2)); - assertEquals("all transactions are complete", 0, brokerService.getBroker().getPreparedTransactions(null).length); - } - - @Test - public void testDirectoryDefault() throws Exception { - MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); - ArrayList adapters = new ArrayList<>(); - - FilteredKahaDBPersistenceAdapter otherFilteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter(); - PersistenceAdapter otherStore = createStore(false); - File someOtherDisk = new File("target" + File.separator + "someOtherDisk"); - otherStore.setDirectory(someOtherDisk); - otherFilteredKahaDBPersistenceAdapter.setPersistenceAdapter(otherStore); - otherFilteredKahaDBPersistenceAdapter.setDestination(new ActiveMQQueue("Other")); - adapters.add(otherFilteredKahaDBPersistenceAdapter); - - FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapterDefault = new FilteredKahaDBPersistenceAdapter(); - PersistenceAdapter storeDefault = createStore(false); - filteredKahaDBPersistenceAdapterDefault.setPersistenceAdapter(storeDefault); - adapters.add(filteredKahaDBPersistenceAdapterDefault); - - multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); - - assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), storeDefault.getDirectory().getParentFile()); - assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile()); - } - - @Test - public void testSlowFastDestinationsStoreUsage() throws Exception { - brokerService.start(); - ExecutorService executorService = Executors.newCachedThreadPool(); - executorService.execute(new Runnable() { - @Override - public void run() { - try { - sendMessages(false, "SlowQ", 50, 500); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - - executorService.execute(new Runnable() { - @Override - public void run() { - try { - sendMessages(false, "FastQ", numToSend, 0); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - - executorService.execute(new Runnable() { - @Override - public void run() { - try { - assertEquals("Got all sent", numToSend, receiveMessages(false, "FastQ", numToSend)); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - - executorService.shutdown(); - assertTrue("consumers executor finished on time", executorService.awaitTermination(5 * 60, TimeUnit.SECONDS)); - final SystemUsage usage = brokerService.getSystemUsage(); - assertTrue("Store is not hogged", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - long storeUsage = usage.getStoreUsage().getUsage(); - LOG.info("Store Usage: " + storeUsage); - return storeUsage < 5 * maxFileLength; - } - })); - assertTrue("no exceptions", exceptions.isEmpty()); - } - - private void sendMessages(boolean transacted, String destName, int count, long sleep) throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - try { - Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(new ActiveMQQueue(destName)); - for (int i = 0; i < count; i++) { - if (sleep > 0) { - TimeUnit.MILLISECONDS.sleep(sleep); - } - producer.send(session.createTextMessage(createContent(i))); - } - if (transacted) { - session.commit(); - } - } - finally { - connection.close(); - } - } - - private int receiveMessages(boolean transacted, String destName, int max) throws JMSException { - int rc = 0; - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - try { - connection.start(); - Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(destName)); - while (rc < max && messageConsumer.receive(4000) != null) { - rc++; - - if (transacted && rc % 200 == 0) { - session.commit(); - } - } - if (transacted) { - session.commit(); - } - return rc; - } - finally { - connection.close(); - } - } - - private String createContent(int i) { - StringBuilder sb = new StringBuilder(i + ":"); - while (sb.length() < 1024) { - sb.append("*"); - } - return sb.toString(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java deleted file mode 100644 index a484c64..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.jdbc; - -import java.io.IOException; - -import org.apache.activemq.broker.ConnectionContext; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class BrokenPersistenceAdapter extends JDBCPersistenceAdapter { - - private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class); - - private boolean shouldBreak = false; - - @Override - public void commitTransaction(ConnectionContext context) throws IOException { - if (shouldBreak) { - LOG.warn("Throwing exception on purpose"); - throw new IOException("Breaking on purpose"); - } - LOG.debug("in commitTransaction"); - super.commitTransaction(context); - } - - public void setShouldBreak(boolean shouldBreak) { - this.shouldBreak = shouldBreak; - } -} - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java deleted file mode 100644 index 5a4aae8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jdbc; - -import static org.junit.Assert.assertEquals; - -import org.apache.activemq.broker.AbstractLocker; -import org.junit.Test; - -public class DatabaseLockerConfigTest { - - @Test - public void testSleepConfig() throws Exception { - LeaseDatabaseLocker underTest = new LeaseDatabaseLocker(); - underTest.setLockAcquireSleepInterval(50); - underTest.configure(null); - assertEquals("configured sleep value retained", 50, underTest.getLockAcquireSleepInterval()); - } - - @Test - public void testDefaultSleepConfig() throws Exception { - LeaseDatabaseLocker underTest = new LeaseDatabaseLocker(); - underTest.configure(null); - assertEquals("configured sleep value retained", AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL, underTest.getLockAcquireSleepInterval()); - } - - @Test - public void testSleepConfigOrig() throws Exception { - DefaultDatabaseLocker underTest = new DefaultDatabaseLocker(); - underTest.setLockAcquireSleepInterval(50); - underTest.configure(null); - assertEquals("configured sleep value retained", 50, underTest.getLockAcquireSleepInterval()); - } - - @Test - public void testDefaultSleepConfigOrig() throws Exception { - DefaultDatabaseLocker underTest = new DefaultDatabaseLocker(); - underTest.configure(null); - assertEquals("configured sleep value retained", AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL, underTest.getLockAcquireSleepInterval()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java deleted file mode 100644 index 00c501f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.jdbc; - -import java.sql.PreparedStatement; -import java.sql.ResultSet; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.derby.jdbc.EmbeddedDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// https://issues.apache.org/activemq/browse/AMQ-2880 -public class JDBCCommitExceptionTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(JDBCCommitExceptionTest.class); - - protected static final int messagesExpected = 10; - protected ActiveMQConnectionFactory factory; - protected BrokerService broker; - protected String connectionUri; - protected EmbeddedDataSource dataSource; - protected java.sql.Connection dbConnection; - protected BrokenPersistenceAdapter jdbc; - - @Override - public void setUp() throws Exception { - broker = createBroker(); - broker.start(); - - factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected); - } - - @Override - public void tearDown() throws Exception { - broker.stop(); - } - - public void testSqlException() throws Exception { - doTestSqlException(); - } - - public void doTestSqlException() throws Exception { - sendMessages(messagesExpected); - int messagesReceived = receiveMessages(messagesExpected); - - dumpMessages(); - assertEquals("Messages expected doesn't equal messages received", messagesExpected, messagesReceived); - broker.stop(); - } - - protected void dumpMessages() throws Exception { - WireFormat wireFormat = new OpenWireFormat(); - java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection(); - PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS"); - ResultSet result = statement.executeQuery(); - LOG.info("Messages left in broker after test"); - while (result.next()) { - long id = result.getLong(1); - org.apache.activemq.command.Message message = (org.apache.activemq.command.Message) wireFormat.unmarshal(new ByteSequence(result.getBytes(2))); - LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message); - } - statement.close(); - conn.close(); - } - - protected int receiveMessages(int messagesExpected) throws Exception { - javax.jms.Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - jdbc.setShouldBreak(true); - - // first try and receive these messages, they'll continually fail - receiveMessages(messagesExpected, session); - - jdbc.setShouldBreak(false); - - // now that the store is sane, try and get all the messages sent - return receiveMessages(messagesExpected, session); - } - - protected int receiveMessages(int messagesExpected, Session session) throws Exception { - int messagesReceived = 0; - - for (int i = 0; i < messagesExpected; i++) { - Destination destination = session.createQueue("TEST"); - MessageConsumer consumer = session.createConsumer(destination); - Message message = null; - try { - LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected); - message = consumer.receive(2000); - LOG.info("Received : " + message); - if (message != null) { - session.commit(); - messagesReceived++; - } - } - catch (Exception e) { - LOG.debug("Caught exception " + e); - session.rollback(); - } - finally { - if (consumer != null) { - consumer.close(); - } - } - } - return messagesReceived; - } - - protected void sendMessages(int messagesExpected) throws Exception { - javax.jms.Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("TEST"); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - for (int i = 0; i < messagesExpected; i++) { - LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected); - producer.send(session.createTextMessage("test message " + (i + 1))); - } - } - - protected BrokerService createBroker() throws Exception { - - BrokerService broker = new BrokerService(); - jdbc = new BrokenPersistenceAdapter(); - - dataSource = new EmbeddedDataSource(); - dataSource.setDatabaseName("target/derbyDb"); - dataSource.setCreateDatabase("create"); - - jdbc.setDataSource(dataSource); - jdbc.setUseLock(false); - jdbc.deleteAllMessages(); - - broker.setPersistenceAdapter(jdbc); - broker.setPersistent(true); - connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); - - return broker; - } -} - - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java deleted file mode 100644 index fa7b848..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jdbc; - -import java.io.IOException; -import java.util.HashMap; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.Locker; -import org.apache.activemq.broker.SuppressReplyException; -import org.apache.activemq.util.LeaseLockerIOExceptionHandler; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.Wait; -import org.jmock.Expectations; -import org.jmock.Mockery; -import org.jmock.States; -import org.jmock.lib.legacy.ClassImposteriser; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class JDBCIOExceptionHandlerMockeryTest { - - private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerMockeryTest.class); - private HashMap exceptions = new HashMap<>(); - - @Test - public void testShutdownWithoutTransportRestart() throws Exception { - - Mockery context = new Mockery() {{ - setImposteriser(ClassImposteriser.INSTANCE); - }}; - - Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.error("unexpected exception {} on thread {}", e, t); - exceptions.put(t, e); - } - }); - - final BrokerService brokerService = context.mock(BrokerService.class); - final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class); - final Locker locker = context.mock(Locker.class); - - final States jdbcConn = context.states("jdbc").startsAs("down"); - final States broker = context.states("broker").startsAs("started"); - - // simulate jdbc up between hasLock and checkpoint, so hasLock fails to verify - context.checking(new Expectations() {{ - allowing(brokerService).isRestartAllowed(); - will(returnValue(false)); - allowing(brokerService).stopAllConnectors(with(any(ServiceStopper.class))); - allowing(brokerService).getPersistenceAdapter(); - will(returnValue(jdbcPersistenceAdapter)); - allowing(jdbcPersistenceAdapter).getLocker(); - will(returnValue(locker)); - allowing(locker).keepAlive(); - when(jdbcConn.is("down")); - will(returnValue(true)); - allowing(locker).keepAlive(); - when(jdbcConn.is("up")); - will(returnValue(false)); - - allowing(jdbcPersistenceAdapter).checkpoint(with(true)); - then(jdbcConn.is("up")); - allowing(brokerService).stop(); - then(broker.is("stopped")); - - }}); - - LeaseLockerIOExceptionHandler underTest = new LeaseLockerIOExceptionHandler(); - underTest.setBrokerService(brokerService); - - try { - underTest.handle(new IOException()); - fail("except suppress reply ex"); - } - catch (SuppressReplyException expected) { - } - - assertTrue("broker stopped state triggered", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("broker state {}", broker); - return broker.is("stopped").isActive(); - } - })); - context.assertIsSatisfied(); - - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java deleted file mode 100644 index 6c43646..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java +++ /dev/null @@ -1,330 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jdbc; - -import java.io.PrintWriter; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.Connection; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.LeaseLockerIOExceptionHandler; -import org.apache.activemq.util.Wait; -import org.apache.derby.jdbc.EmbeddedDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test to see if the JDBCExceptionIOHandler will restart the transport connectors correctly after - * the underlying DB has been stopped and restarted - * - * see AMQ-4575 - */ -public class JDBCIOExceptionHandlerTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerTest.class); - private static final String TRANSPORT_URL = "tcp://0.0.0.0:0"; - - private static final String DATABASE_NAME = "DERBY_OVERRIDE"; - private ActiveMQConnectionFactory factory; - private ReconnectingEmbeddedDataSource dataSource; - private BrokerService broker; - - protected BrokerService createBroker(boolean withJMX) throws Exception { - return createBroker("localhost", withJMX, true, true); - } - - protected BrokerService createBroker(String name, - boolean withJMX, - boolean leaseLocker, - boolean startStopConnectors) throws Exception { - BrokerService broker = new BrokerService(); - broker.setBrokerName(name); - - broker.setUseJmx(withJMX); - - EmbeddedDataSource embeddedDataSource = new EmbeddedDataSource(); - embeddedDataSource.setDatabaseName(DATABASE_NAME); - embeddedDataSource.setCreateDatabase("create"); - - // create a wrapper to EmbeddedDataSource to allow the connection be - // reestablished to derby db - dataSource = new ReconnectingEmbeddedDataSource(embeddedDataSource); - - JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); - jdbc.setDataSource(dataSource); - - jdbc.setLockKeepAlivePeriod(1000L); - if (leaseLocker) { - LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); - leaseDatabaseLocker.setHandleStartException(true); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); - jdbc.setLocker(leaseDatabaseLocker); - } - - broker.setPersistenceAdapter(jdbc); - LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); - ioExceptionHandler.setResumeCheckSleepPeriod(1000L); - ioExceptionHandler.setStopStartConnectors(startStopConnectors); - broker.setIoExceptionHandler(ioExceptionHandler); - String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString(); - - factory = new ActiveMQConnectionFactory(connectionUri); - - return broker; - } - - /* - * run test without JMX enabled - */ - public void testRecoverWithOutJMX() throws Exception { - recoverFromDisconnectDB(false); - } - - /* - * run test with JMX enabled - */ - public void testRecoverWithJMX() throws Exception { - recoverFromDisconnectDB(true); - } - - public void testSlaveStoppedLease() throws Exception { - testSlaveStopped(true); - } - - public void testSlaveStoppedDefault() throws Exception { - testSlaveStopped(false); - } - - public void testSlaveStopped(final boolean lease) throws Exception { - final BrokerService master = createBroker("master", true, lease, false); - master.start(); - master.waitUntilStarted(); - - final AtomicReference slave = new AtomicReference<>(); - - Thread slaveThread = new Thread() { - @Override - public void run() { - try { - BrokerService broker = new BrokerService(); - broker.setBrokerName("slave"); - - JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); - jdbc.setDataSource(dataSource); - - jdbc.setLockKeepAlivePeriod(1000L); - - if (lease) { - LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); - leaseDatabaseLocker.setHandleStartException(true); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); - jdbc.setLocker(leaseDatabaseLocker); - } - - broker.setPersistenceAdapter(jdbc); - LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); - ioExceptionHandler.setResumeCheckSleepPeriod(1000L); - ioExceptionHandler.setStopStartConnectors(false); - broker.setIoExceptionHandler(ioExceptionHandler); - slave.set(broker); - broker.start(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }; - - slaveThread.start(); - - Thread.sleep(5000); - - dataSource.stopDB(); - - assertTrue("Master hasn't been stopped", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return master.isStopped(); - } - })); - - assertTrue("Slave hasn't been stopped", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return slave.get().isStopped(); - } - })); - - } - - public void recoverFromDisconnectDB(boolean withJMX) throws Exception { - try { - broker = createBroker(withJMX); - broker.start(); - broker.waitUntilStarted(); - - // broker started - stop db underneath it - dataSource.stopDB(); - - // wait - allow the leaselocker to kick the JDBCIOExceptionHandler - TimeUnit.SECONDS.sleep(3); - - // check connector has shutdown - checkTransportConnectorStopped(); - - // restart db underneath - dataSource.restartDB(); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.debug("*** checking connector to start..."); - try { - checkTransportConnectorStarted(); - return true; - } - catch (Throwable t) { - LOG.debug(t.toString()); - } - return false; - } - }); - - } - finally { - LOG.debug("*** broker is stopping..."); - broker.stop(); - } - } - - private void checkTransportConnectorStopped() { - // connection is expected to fail - try { - factory.createConnection(); - fail("Transport connector should be stopped"); - } - catch (Exception ex) { - // expected an exception - LOG.debug(" checkTransportConnectorStopped() threw", ex); - } - } - - private void checkTransportConnectorStarted() { - // connection is expected to succeed - try { - Connection conn = factory.createConnection(); - conn.close(); - } - catch (Exception ex) { - LOG.debug("checkTransportConnectorStarted() threw", ex); - fail("Transport connector should have been started"); - } - } - - /* - * Wrapped the derby datasource object to get DB reconnect functionality as I not - * manage to get that working directly on the EmbeddedDataSource - * - * NOTE: Not a thread Safe but for this unit test it should be fine - */ - public class ReconnectingEmbeddedDataSource implements javax.sql.DataSource { - - private EmbeddedDataSource realDatasource; - - public ReconnectingEmbeddedDataSource(EmbeddedDataSource datasource) { - this.realDatasource = datasource; - } - - @Override - public PrintWriter getLogWriter() throws SQLException { - return this.realDatasource.getLogWriter(); - } - - @Override - public void setLogWriter(PrintWriter out) throws SQLException { - this.realDatasource.setLogWriter(out); - - } - - @Override - public void setLoginTimeout(int seconds) throws SQLException { - this.realDatasource.setLoginTimeout(seconds); - } - - @Override - public int getLoginTimeout() throws SQLException { - return this.realDatasource.getLoginTimeout(); - } - - @Override - public T unwrap(Class iface) throws SQLException { - return this.unwrap(iface); - } - - @Override - public boolean isWrapperFor(Class iface) throws SQLException { - return this.isWrapperFor(iface); - } - - @Override - public java.sql.Connection getConnection() throws SQLException { - return this.realDatasource.getConnection(); - } - - @Override - public java.sql.Connection getConnection(String username, String password) throws SQLException { - return this.getConnection(username, password); - } - - /** - * To simulate a db reconnect I just create a new EmbeddedDataSource . - * - * @throws SQLException - */ - public void restartDB() throws SQLException { - EmbeddedDataSource newDatasource = new EmbeddedDataSource(); - newDatasource.setDatabaseName(DATABASE_NAME); - newDatasource.getConnection(); - LOG.info("*** DB restarted now..."); - this.realDatasource = newDatasource; - } - - public void stopDB() { - try { - realDatasource.setShutdownDatabase("shutdown"); - LOG.info("***DB is being shutdown..."); - dataSource.getConnection(); - fail("should have thrown a db closed exception"); - } - catch (Exception ex) { - ex.printStackTrace(System.out); - } - } - - @Override - public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml deleted file mode 100644 index ac70fa7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java deleted file mode 100644 index 854dd7a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jdbc; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.PersistenceAdapter; - -public class JDBCLockTablePrefixTest extends TestCase { - - public void testLockTable() throws Exception { - BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml"); - broker.waitUntilStarted(); - - PersistenceAdapter pa = broker.getPersistenceAdapter(); - assertNotNull(pa); - - JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter) pa; - assertEquals("TTT_", jpa.getStatements().getTablePrefix()); - assertEquals("AMQ_MSGS2", jpa.getStatements().getMessageTableName()); - assertEquals("AMQ_LOCK2", jpa.getStatements().getLockTableName()); - - broker.stop(); - broker.waitUntilStopped(); - } - -}