Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8AB75187E1 for ; Thu, 25 Feb 2016 04:08:51 +0000 (UTC) Received: (qmail 55877 invoked by uid 500); 25 Feb 2016 04:08:51 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 55755 invoked by uid 500); 25 Feb 2016 04:08:51 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 54248 invoked by uid 99); 25 Feb 2016 04:08:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 04:08:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C74B4E8EDF; Thu, 25 Feb 2016 04:08:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 25 Feb 2016 04:09:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/50] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java deleted file mode 100644 index 7ea4044..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; -import org.junit.After; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - -public class AMQ5450Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ5450Test.class); - private final static int maxFileLength = 1024 * 1024 * 32; - - private final static String POSTFIX_DESTINATION_NAME = ".dlq"; - - private final static String DESTINATION_NAME = "test" + POSTFIX_DESTINATION_NAME; - private final static String DESTINATION_NAME_2 = "2.test" + POSTFIX_DESTINATION_NAME; - private final static String DESTINATION_NAME_3 = "3.2.test" + POSTFIX_DESTINATION_NAME; - - private final static String[] DESTS = new String[]{DESTINATION_NAME, DESTINATION_NAME_2, DESTINATION_NAME_3, DESTINATION_NAME, DESTINATION_NAME}; - - BrokerService broker; - private HashMap adapters = new HashMap(); - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - protected BrokerService createAndStartBroker(PersistenceAdapter persistenceAdapter) throws Exception { - BrokerService broker = new BrokerService(); - broker.setUseJmx(false); - broker.setBrokerName("localhost"); - broker.setPersistenceAdapter(persistenceAdapter); - broker.setDeleteAllMessagesOnStartup(true); - broker.start(); - broker.waitUntilStarted(); - return broker; - } - - @Test - public void testPostFixMatch() throws Exception { - doTestPostFixMatch(false); - } - - @Test - public void testPostFixCompositeMatch() throws Exception { - doTestPostFixMatch(true); - } - - private void doTestPostFixMatch(boolean useComposite) throws Exception { - prepareBrokerWithMultiStore(useComposite); - - sendMessage(DESTINATION_NAME, "test 1"); - sendMessage(DESTINATION_NAME_2, "test 1"); - sendMessage(DESTINATION_NAME_3, "test 1"); - - assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); - assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); - assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3))); - - for (String dest : DESTS) { - Destination destination2 = broker.getDestination(new ActiveMQQueue(dest)); - assertNotNull(destination2); - assertEquals(1, destination2.getMessageStore().getMessageCount()); - } - - HashMap numDests = new HashMap(); - for (PersistenceAdapter pa : adapters.values()) { - numDests.put(pa.getDestinations().size(), pa); - } - - // ensure wildcard does not match any - assertTrue("0 in wildcard matcher", adapters.get(null).getDestinations().isEmpty()); - - assertEquals("only two values", 2, numDests.size()); - assertTrue("0 in others", numDests.containsKey(0)); - - if (useComposite) { - assertTrue("3 in one", numDests.containsKey(3)); - } - else { - assertTrue("1 in some", numDests.containsKey(1)); - } - - } - - protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - kaha.setJournalMaxFileLength(maxFileLength); - kaha.setCleanupInterval(5000); - if (delete) { - kaha.deleteAllMessages(); - } - return kaha; - } - - public void prepareBrokerWithMultiStore(boolean compositeMatch) throws Exception { - - MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); - multiKahaDBPersistenceAdapter.deleteAllMessages(); - ArrayList adapters = new ArrayList<>(); - - if (compositeMatch) { - StringBuffer compositeDestBuf = new StringBuffer(); - for (int i = 1; i <= DESTS.length; i++) { - for (int j = 0; j < i; j++) { - compositeDestBuf.append("*"); - if ((j + 1 == i)) { - compositeDestBuf.append(POSTFIX_DESTINATION_NAME); - } - else { - compositeDestBuf.append("."); - } - } - if (!(i + 1 > DESTS.length)) { - compositeDestBuf.append(","); - } - } - adapters.add(createFilteredKahaDBByDestinationPrefix(compositeDestBuf.toString(), true)); - - } - else { - // destination map does not do post fix wild card matches on paths, so we need to cover - // each path length - adapters.add(createFilteredKahaDBByDestinationPrefix("*" + POSTFIX_DESTINATION_NAME, true)); - adapters.add(createFilteredKahaDBByDestinationPrefix("*.*" + POSTFIX_DESTINATION_NAME, true)); - adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*" + POSTFIX_DESTINATION_NAME, true)); - adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*.*" + POSTFIX_DESTINATION_NAME, true)); - } - - // ensure wildcard matcher is there for other dests - adapters.add(createFilteredKahaDBByDestinationPrefix(null, true)); - - multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); - broker = createAndStartBroker(multiKahaDBPersistenceAdapter); - } - - private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, - boolean deleteAllMessages) throws IOException { - FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); - template.setPersistenceAdapter(createStore(deleteAllMessages)); - if (destinationPrefix != null) { - template.setQueue(destinationPrefix); - } - adapters.put(destinationPrefix, template.getPersistenceAdapter()); - return template; - } - - private void sendMessage(String destinationName, String message) throws JMSException { - ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost"); - f.setAlwaysSyncSend(true); - Connection c = f.createConnection(); - c.start(); - Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName)); - producer.send(s.createTextMessage(message)); - producer.close(); - s.close(); - c.stop(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java deleted file mode 100644 index 5ed211b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.concurrent.TimeUnit; -import javax.jms.JMSException; -import javax.jms.TextMessage; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerRestartTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.StubConnection; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.jdbc.DataSourceServiceSupport; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; -import org.apache.activemq.util.IOHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ5567Test extends BrokerRestartTestSupport { - - protected static final Logger LOG = LoggerFactory.getLogger(AMQ5567Test.class); - ActiveMQQueue destination = new ActiveMQQueue("Q"); - - @Override - protected void configureBroker(BrokerService broker) throws Exception { - super.configureBroker(broker); - broker.setPersistenceAdapter(persistenceAdapter); - } - - @Override - protected PolicyEntry getDefaultPolicy() { - PolicyEntry policy = new PolicyEntry(); - policy.setMemoryLimit(60 * 1024); - return policy; - } - - public void initCombosForTestPreparedTransactionNotDispatched() throws Exception { - PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{new KahaDBPersistenceAdapter(), new LevelDBPersistenceAdapter(), new JDBCPersistenceAdapter(DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())}; - for (PersistenceAdapter adapter : persistenceAdapters) { - adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); - } - addCombinationValues("persistenceAdapter", persistenceAdapters); - } - - public void testPreparedTransactionNotDispatched() throws Exception { - - ActiveMQDestination destination = new ActiveMQQueue("Q"); - - StubConnection connection = createConnection(); - ConnectionInfo connectionInfo = createConnectionInfo(); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - ProducerInfo producerInfo = createProducerInfo(sessionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - connection.send(producerInfo); - - XATransactionId txid = createXATransaction(sessionInfo); - connection.send(createBeginTransaction(connectionInfo, txid)); - Message message = createMessage(producerInfo, destination); - message.setPersistent(true); - message.setTransactionId(txid); - connection.send(message); - - connection.send(createPrepareTransaction(connectionInfo, txid)); - - // send another non tx, will poke dispatch - message = createMessage(producerInfo, destination); - message.setPersistent(true); - connection.send(message); - - // Since prepared but not committed.. only one should get delivered - StubConnection connectionC = createConnection(); - ConnectionInfo connectionInfoC = createConnectionInfo(); - SessionInfo sessionInfoC = createSessionInfo(connectionInfoC); - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination); - connectionC.send(connectionInfoC); - connectionC.send(sessionInfoC); - connectionC.send(consumerInfo); - - Message m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); - LOG.info("received: " + m); - assertNotNull("Got message", m); - assertNull("Got non tx message", m.getTransactionId()); - - // cannot get the prepared message till commit - assertNull(receiveMessage(connectionC)); - assertNoMessagesLeft(connectionC); - - LOG.info("commit: " + txid); - connection.request(createCommitTransaction2Phase(connectionInfo, txid)); - - m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); - LOG.info("received: " + m); - assertNotNull("Got non null message", m); - - } - - public void initCombosForTestCursorStoreSync() throws Exception { - PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{new KahaDBPersistenceAdapter(), new LevelDBPersistenceAdapter(), new JDBCPersistenceAdapter(DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())}; - for (PersistenceAdapter adapter : persistenceAdapters) { - adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); - } - addCombinationValues("persistenceAdapter", persistenceAdapters); - } - - public void testCursorStoreSync() throws Exception { - - StubConnection connection = createConnection(); - ConnectionInfo connectionInfo = createConnectionInfo(); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - ProducerInfo producerInfo = createProducerInfo(sessionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - connection.send(producerInfo); - - XATransactionId txid = createXATransaction(sessionInfo); - connection.send(createBeginTransaction(connectionInfo, txid)); - Message message = createMessage(producerInfo, destination); - message.setPersistent(true); - message.setTransactionId(txid); - connection.request(message); - - connection.request(createPrepareTransaction(connectionInfo, txid)); - - QueueViewMBean proxy = getProxyToQueueViewMBean(); - assertTrue("cache is enabled", proxy.isCacheEnabled()); - - // send another non tx, will fill cursor - String payload = new String(new byte[10 * 1024]); - for (int i = 0; i < 6; i++) { - message = createMessage(producerInfo, destination); - message.setPersistent(true); - ((TextMessage) message).setText(payload); - connection.request(message); - } - - assertTrue("cache is disabled", !proxy.isCacheEnabled()); - - StubConnection connectionC = createConnection(); - ConnectionInfo connectionInfoC = createConnectionInfo(); - SessionInfo sessionInfoC = createSessionInfo(connectionInfoC); - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination); - connectionC.send(connectionInfoC); - connectionC.send(sessionInfoC); - connectionC.send(consumerInfo); - - Message m = null; - for (int i = 0; i < 3; i++) { - m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); - LOG.info("received: " + m); - assertNotNull("Got message", m); - assertNull("Got non tx message", m.getTransactionId()); - connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); - } - - LOG.info("commit: " + txid); - connection.request(createCommitTransaction2Phase(connectionInfo, txid)); - // consume the rest including the 2pc send in TX - - for (int i = 0; i < 4; i++) { - m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); - LOG.info("received[" + i + "] " + m); - assertNotNull("Got message", m); - if (i == 3) { - assertNotNull("Got tx message", m.getTransactionId()); - } - else { - assertNull("Got non tx message", m.getTransactionId()); - } - connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); - } - } - - private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException { - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + destination.getQueueName() + ",type=Broker,brokerName=localhost"); - QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } - - public static Test suite() { - return suite(AMQ5567Test.class); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java deleted file mode 100644 index e8414d5..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; -import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQTopic; -import org.junit.Assert; -import org.junit.Test; - -/** - * @author James Furness - * https://issues.apache.org/jira/browse/AMQ-3607 - */ -public class ActiveMQSlowConsumerManualTest { - - private static final int PORT = 12345; - private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC"); - private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true"; - - @Test(timeout = 60000) - public void testDefaultSettings() throws Exception { - runTest("testDefaultSettings", 30, -1, -1, false, false, false, false); - } - - @Test(timeout = 60000) - public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception { - runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false); - } - - @Test(timeout = 60000) - public void testBounded() throws Exception { - runTest("testBounded", 30, 5, 25, false, false, false, false); - } - - @Test(timeout = 60000) - public void testBoundedWithOptimiseAcknowledge() throws Exception { - runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, true, false); - } - - public void runTest(String name, - int sendMessageCount, - int prefetchLimit, - int messageLimit, - boolean evictOldestMessage, - boolean disableFlowControl, - boolean optimizeAcknowledge, - boolean persistent) throws Exception { - BrokerService broker = createBroker(persistent); - broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl)); - broker.start(); - - // Slow consumer - Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge); - final CountDownLatch blockSlowConsumer = new CountDownLatch(1); - final AtomicInteger slowConsumerReceiveCount = new AtomicInteger(); - final List slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList() : null; - MessageConsumer slowConsumer = createSubscriber(slowConsumerSession, new MessageListener() { - @Override - public void onMessage(Message message) { - try { - slowConsumerReceiveCount.incrementAndGet(); - int count = Integer.parseInt(((TextMessage) message).getText()); - if (slowConsumerReceived != null) - slowConsumerReceived.add(count); - if (count % 10000 == 0) - System.out.println("SlowConsumer: Receive " + count); - blockSlowConsumer.await(); - } - catch (Exception ignored) { - } - } - }); - - // Fast consumer - Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge); - final AtomicInteger fastConsumerReceiveCount = new AtomicInteger(); - final List fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList() : null; - MessageConsumer fastConsumer = createSubscriber(fastConsumerSession, new MessageListener() { - @Override - public void onMessage(Message message) { - try { - fastConsumerReceiveCount.incrementAndGet(); - TimeUnit.MILLISECONDS.sleep(5); - int count = Integer.parseInt(((TextMessage) message).getText()); - if (fastConsumerReceived != null) - fastConsumerReceived.add(count); - if (count % 10000 == 0) - System.out.println("FastConsumer: Receive " + count); - } - catch (Exception ignored) { - } - } - }); - - // Wait for consumers to connect - Thread.sleep(500); - - // Publisher - AtomicInteger sentCount = new AtomicInteger(); - List sent = sendMessageCount <= 1000 ? new ArrayList() : null; - Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge); - MessageProducer publisher = createPublisher(publisherSession); - for (int i = 0; i < sendMessageCount; i++) { - sentCount.incrementAndGet(); - if (sent != null) - sent.add(i); - if (i % 10000 == 0) - System.out.println("Publisher: Send " + i); - publisher.send(publisherSession.createTextMessage(Integer.toString(i))); - } - - // Wait for messages to arrive - Thread.sleep(500); - - System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent); - System.out.println(name + ": Whilst slow consumer blocked:"); - System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived); - System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived); - - // Unblock slow consumer - blockSlowConsumer.countDown(); - - // Wait for messages to arrive - Thread.sleep(500); - - System.out.println(name + ": After slow consumer unblocked:"); - System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived); - System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived); - System.out.println(); - - publisher.close(); - publisherSession.close(); - slowConsumer.close(); - slowConsumerSession.close(); - fastConsumer.close(); - fastConsumerSession.close(); - broker.stop(); - - Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived); - // this is too timine dependent as sometimes there is message eviction, would need to check the dlq - //Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size()); - } - - private static BrokerService createBroker(boolean persistent) throws Exception { - BrokerService broker = new BrokerService(); - broker.setBrokerName("TestBroker"); - broker.setPersistent(persistent); - broker.addConnector(URL); - return broker; - } - - private static MessageConsumer createSubscriber(Session session, - MessageListener messageListener) throws JMSException { - MessageConsumer consumer = session.createConsumer(TOPIC); - consumer.setMessageListener(messageListener); - return consumer; - } - - private static MessageProducer createPublisher(Session session) throws JMSException { - MessageProducer producer = session.createProducer(TOPIC); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - return producer; - } - - private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); - - connectionFactory.setCopyMessageOnSend(false); - connectionFactory.setDisableTimeStampsByDefault(true); - connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge); - if (optimizeAcknowledge) { - connectionFactory.setOptimizeAcknowledgeTimeOut(1); - } - - Connection connection = connectionFactory.createConnection(); - connection.setClientID(clientId); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - connection.start(); - - return session; - } - - private static PolicyMap buildPolicy(ActiveMQTopic topic, - int prefetchLimit, - int messageLimit, - boolean evictOldestMessage, - boolean disableFlowControl) { - PolicyMap policyMap = new PolicyMap(); - - PolicyEntry policyEntry = new PolicyEntry(); - - if (evictOldestMessage) { - policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy()); - } - - if (disableFlowControl) { - policyEntry.setProducerFlowControl(false); - } - - if (prefetchLimit > 0) { - policyEntry.setTopicPrefetch(prefetchLimit); - } - - if (messageLimit > 0) { - ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy(); - messageLimitStrategy.setLimit(messageLimit); - policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy); - } - - policyMap.put(topic, policyEntry); - - return policyMap; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java deleted file mode 100644 index 2d6a48c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(ConnectionPerMessageTest.class); - private static final int COUNT = 2000; - protected String bindAddress; - - public void testConnectionPerMessage() throws Exception { - final String topicName = "test.topic"; - - LOG.info("Initializing connection factory for JMS to URL: " + bindAddress); - final ActiveMQConnectionFactory normalFactory = new ActiveMQConnectionFactory(); - normalFactory.setBrokerURL(bindAddress); - for (int i = 0; i < COUNT; i++) { - - if (i % 100 == 0) { - LOG.info(new Integer(i).toString()); - } - - Connection conn = null; - try { - - conn = normalFactory.createConnection(); - final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Topic topic = session.createTopic(topicName); - final MessageProducer producer = session.createProducer(topic); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - final MapMessage m = session.createMapMessage(); - m.setInt("hey", i); - - producer.send(m); - - } - catch (JMSException e) { - LOG.warn(e.getMessage(), e); - } - finally { - if (conn != null) - try { - conn.close(); - } - catch (JMSException e) { - LOG.warn(e.getMessage(), e); - } - } - } - } - - @Override - protected void setUp() throws Exception { - bindAddress = "vm://localhost"; - super.setUp(); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setDeleteAllMessagesOnStartup(true); - answer.setUseJmx(false); - answer.setPersistent(isPersistent()); - answer.addConnector(bindAddress); - return answer; - } - - @Override - protected boolean isPersistent() { - return true; - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java deleted file mode 100644 index 35da06c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.command.ActiveMQQueue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class CraigsBugTest extends EmbeddedBrokerTestSupport { - - private String connectionUri; - - public void testConnectionFactory() throws Exception { - final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); - final ActiveMQQueue queue = new ActiveMQQueue("testqueue"); - final Connection conn = cf.createConnection(); - - Runnable r = new Runnable() { - @Override - public void run() { - try { - Session session = conn.createSession(false, 1); - MessageConsumer consumer = session.createConsumer(queue, null); - consumer.receive(1000); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - }; - new Thread(r).start(); - conn.start(); - - try { - new CountDownLatch(1).await(3, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Override - protected void setUp() throws Exception { - bindAddress = "tcp://localhost:0"; - super.setUp(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java deleted file mode 100644 index a79ca58..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.util.concurrent.TimeoutException; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.command.ActiveMQDestination; -import org.junit.Assert; - -public class DoubleExpireTest extends EmbeddedBrokerTestSupport { - - private static final long MESSAGE_TTL_MILLIS = 1000; - private static final long MAX_TEST_TIME_MILLIS = 60000; - - @Override - public void setUp() throws Exception { - setAutoFail(true); - setMaxTestTime(MAX_TEST_TIME_MILLIS); - super.setUp(); - } - - /** - * This test verifies that a message that expires can be be resent to queue - * with a new expiration and that it will be processed as a new message and - * allowed to re-expire. - *

- * NOTE: This test fails on AMQ 5.4.2 because the originalExpiration - * timestamp is not cleared when the message is resent. - */ - public void testDoubleExpireWithoutMove() throws Exception { - // Create the default dead letter queue. - final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ"); - - Connection conn = createConnection(); - try { - conn.start(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Verify that the test queue and DLQ are empty. - Assert.assertEquals(0, getSize(destination)); - Assert.assertEquals(0, getSize(DLQ)); - - // Enqueue a message to the test queue that will expire after 1s. - MessageProducer producer = session.createProducer(destination); - Message testMessage = session.createTextMessage("test message"); - producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS); - Assert.assertEquals(1, getSize(destination)); - - // Wait for the message to expire. - waitForSize(destination, 0, MAX_TEST_TIME_MILLIS); - Assert.assertEquals(1, getSize(DLQ)); - - // Consume the message from the DLQ and re-enqueue it to the test - // queue so that it expires after 1s. - MessageConsumer consumer = session.createConsumer(DLQ); - Message expiredMessage = consumer.receive(); - Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage.getJMSMessageID()); - - producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS); - Assert.assertEquals(1, getSize(destination)); - Assert.assertEquals(0, getSize(DLQ)); - - // Verify that the resent message is "different" in that it has - // another ID. - Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage.getJMSMessageID()); - - // Wait for the message to re-expire. - waitForSize(destination, 0, MAX_TEST_TIME_MILLIS); - Assert.assertEquals(1, getSize(DLQ)); - - // Re-consume the message from the DLQ. - Message reexpiredMessage = consumer.receive(); - Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage.getJMSMessageID()); - } - finally { - conn.close(); - } - } - - /** - * A helper method that returns the embedded broker's implementation of a - * JMS queue. - */ - private Queue getPhysicalDestination(ActiveMQDestination destination) throws Exception { - return (Queue) broker.getAdminView().getBroker().getDestinationMap().get(destination); - } - - /** - * A helper method that returns the size of the specified queue/topic. - */ - private long getSize(ActiveMQDestination destination) throws Exception { - return getPhysicalDestination(destination) != null ? getPhysicalDestination(destination).getDestinationStatistics().getMessages().getCount() : 0; - } - - /** - * A helper method that waits for a destination to reach a certain size. - */ - private void waitForSize(ActiveMQDestination destination, - int size, - long timeoutMillis) throws Exception, TimeoutException { - long startTimeMillis = System.currentTimeMillis(); - - while (getSize(destination) != size && System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) { - Thread.sleep(250); - } - - if (getSize(destination) != size) { - throw new TimeoutException("Destination " + destination.getPhysicalName() + " did not reach size " + size + " within " + timeoutMillis + "ms."); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java deleted file mode 100644 index 3046423..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ /dev/null @@ -1,479 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.management.ObjectName; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Test case for AMQ-1479 - */ -public class DurableConsumerTest extends CombinationTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class); - private static int COUNT = 1024; - private static String CONSUMER_NAME = "DURABLE_TEST"; - protected BrokerService broker; - - protected String bindAddress = "tcp://localhost:61616"; - - protected byte[] payload = new byte[1024 * 32]; - protected ConnectionFactory factory; - protected Vector exceptions = new Vector<>(); - - private static final String TOPIC_NAME = "failoverTopic"; - private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; - public boolean useDedicatedTaskRunner = false; - - private class SimpleTopicSubscriber implements MessageListener, ExceptionListener { - - private TopicConnection topicConnection = null; - - public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) { - - ActiveMQConnectionFactory topicConnectionFactory = null; - TopicSession topicSession = null; - Topic topic = null; - TopicSubscriber topicSubscriber = null; - - topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL); - try { - - topic = new ActiveMQTopic(topicName); - topicConnection = topicConnectionFactory.createTopicConnection(); - topicConnection.setClientID((clientId)); - topicConnection.start(); - - topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId)); - topicSubscriber.setMessageListener(this); - - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - @Override - public void onMessage(Message arg0) { - } - - public void closeConnection() { - if (topicConnection != null) { - try { - topicConnection.close(); - } - catch (JMSException e) { - } - } - } - - @Override - public void onException(JMSException exception) { - exceptions.add(exception); - } - } - - private class MessagePublisher implements Runnable { - - private final boolean shouldPublish = true; - - @Override - public void run() { - TopicConnectionFactory topicConnectionFactory = null; - TopicConnection topicConnection = null; - TopicSession topicSession = null; - Topic topic = null; - TopicPublisher topicPublisher = null; - Message message = null; - - topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL); - try { - topic = new ActiveMQTopic(TOPIC_NAME); - topicConnection = topicConnectionFactory.createTopicConnection(); - topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - topicPublisher = topicSession.createPublisher(topic); - message = topicSession.createMessage(); - } - catch (Exception ex) { - exceptions.add(ex); - } - while (shouldPublish) { - try { - topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000); - } - catch (JMSException ex) { - exceptions.add(ex); - } - try { - Thread.sleep(1); - } - catch (Exception ex) { - } - } - } - } - - private void configurePersistence(BrokerService broker) throws Exception { - File dataDirFile = new File("target/" + getName()); - KahaDBPersistenceAdapter kahaDBAdapter = new KahaDBPersistenceAdapter(); - kahaDBAdapter.setDirectory(dataDirFile); - broker.setPersistenceAdapter(kahaDBAdapter); - } - - public void testFailover() throws Exception { - - configurePersistence(broker); - broker.start(); - - Thread publisherThread = new Thread(new MessagePublisher()); - publisherThread.start(); - final int numSubs = 100; - final List list = new ArrayList<>(numSubs); - for (int i = 0; i < numSubs; i++) { - - final int id = i; - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - SimpleTopicSubscriber s = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); - list.add(s); - } - }); - thread.start(); - - } - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return numSubs == list.size(); - } - }); - - broker.stop(); - broker = createBroker(false); - configurePersistence(broker); - broker.start(); - Thread.sleep(10000); - for (SimpleTopicSubscriber s : list) { - s.closeConnection(); - } - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - } - - // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028 - // with use dedicatedTaskRunner=true and produce OOM - public void initCombosForTestConcurrentDurableConsumer() { - addCombinationValues("useDedicatedTaskRunner", new Object[]{Boolean.TRUE, Boolean.FALSE}); - } - - public void testConcurrentDurableConsumer() throws Exception { - - broker.start(); - broker.waitUntilStarted(); - - factory = createConnectionFactory(); - final String topicName = getName(); - final int numMessages = 500; - int numConsumers = 1; - final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers); - final AtomicInteger receivedCount = new AtomicInteger(); - Runnable consumer = new Runnable() { - @Override - public void run() { - final String consumerName = Thread.currentThread().getName(); - int acked = 0; - int received = 0; - - try { - while (acked < numMessages / 2) { - // take one message and close, ack on occasion - Connection consumerConnection = factory.createConnection(); - ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false); - consumerConnection.setClientID(consumerName); - Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic topic = consumerSession.createTopic(topicName); - consumerConnection.start(); - - MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName); - - counsumerStarted.countDown(); - Message msg = null; - do { - msg = consumer.receive(5000); - if (msg != null) { - receivedCount.incrementAndGet(); - if (received != 0 && received % 100 == 0) { - LOG.info("Received msg: " + msg.getJMSMessageID()); - } - if (++received % 2 == 0) { - msg.acknowledge(); - acked++; - } - } - } while (msg == null); - - consumerConnection.close(); - } - assertTrue(received >= acked); - } - catch (Exception e) { - e.printStackTrace(); - exceptions.add(e); - } - } - }; - - ExecutorService executor = Executors.newFixedThreadPool(numConsumers); - - for (int i = 0; i < numConsumers; i++) { - executor.execute(consumer); - } - - assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS)); - - Connection producerConnection = factory.createConnection(); - ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = producerSession.createTopic(topicName); - MessageProducer producer = producerSession.createProducer(topic); - producerConnection.start(); - for (int i = 0; i < numMessages; i++) { - BytesMessage msg = producerSession.createBytesMessage(); - msg.writeBytes(payload); - producer.send(msg); - if (i != 0 && i % 100 == 0) { - LOG.info("Sent msg " + i); - } - } - - executor.shutdown(); - executor.awaitTermination(30, TimeUnit.SECONDS); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("receivedCount: " + receivedCount.get()); - return receivedCount.get() == numMessages; - } - }, 360 * 1000); - assertEquals("got required some messages", numMessages, receivedCount.get()); - assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty()); - } - - public void testConsumerRecover() throws Exception { - doTestConsumer(true); - } - - public void testConsumer() throws Exception { - doTestConsumer(false); - } - - public void testPrefetchViaBrokerConfig() throws Exception { - - Integer prefetchVal = new Integer(150); - PolicyEntry policyEntry = new PolicyEntry(); - policyEntry.setDurableTopicPrefetch(prefetchVal.intValue()); - policyEntry.setPrioritizedMessages(true); - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(policyEntry); - broker.setDestinationPolicy(policyMap); - broker.start(); - - factory = createConnectionFactory(); - Connection consumerConnection = factory.createConnection(); - consumerConnection.setClientID(CONSUMER_NAME); - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = consumerSession.createTopic(getClass().getName()); - MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); - consumerConnection.start(); - - ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0]; - Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize"); - assertEquals(prefetchVal, prefetchFromSubView); - } - - public void doTestConsumer(boolean forceRecover) throws Exception { - - if (forceRecover) { - configurePersistence(broker); - } - broker.start(); - - factory = createConnectionFactory(); - Connection consumerConnection = factory.createConnection(); - consumerConnection.setClientID(CONSUMER_NAME); - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = consumerSession.createTopic(getClass().getName()); - MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); - consumerConnection.start(); - consumerConnection.close(); - broker.stop(); - broker = createBroker(false); - if (forceRecover) { - configurePersistence(broker); - } - broker.start(); - - Connection producerConnection = factory.createConnection(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = producerSession.createProducer(topic); - producerConnection.start(); - for (int i = 0; i < COUNT; i++) { - BytesMessage msg = producerSession.createBytesMessage(); - msg.writeBytes(payload); - producer.send(msg); - if (i != 0 && i % 1000 == 0) { - LOG.info("Sent msg " + i); - } - } - producerConnection.close(); - broker.stop(); - broker = createBroker(false); - if (forceRecover) { - configurePersistence(broker); - } - broker.start(); - - consumerConnection = factory.createConnection(); - consumerConnection.setClientID(CONSUMER_NAME); - consumerConnection.start(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); - for (int i = 0; i < COUNT; i++) { - Message msg = consumer.receive(10000); - assertNotNull("Missing message: " + i, msg); - if (i != 0 && i % 1000 == 0) { - LOG.info("Received msg " + i); - } - - } - consumerConnection.close(); - - } - - @Override - protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(true); - } - - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - } - - protected Topic creatTopic(Session s, String destinationName) throws JMSException { - return s.createTopic(destinationName); - } - - /** - * Factory method to create a new broker - * - * @throws Exception - */ - protected BrokerService createBroker(boolean deleteStore) throws Exception { - BrokerService answer = new BrokerService(); - configureBroker(answer, deleteStore); - return answer; - } - - protected void configureBroker(BrokerService answer, boolean deleteStore) throws Exception { - answer.setDeleteAllMessagesOnStartup(deleteStore); - KahaDBStore kaha = new KahaDBStore(); - //kaha.setConcurrentStoreAndDispatchTopics(false); - File directory = new File("target/activemq-data/kahadb"); - if (deleteStore) { - IOHelper.deleteChildren(directory); - } - kaha.setDirectory(directory); - //kaha.setMaxAsyncJobs(10); - - answer.setPersistenceAdapter(kaha); - answer.addConnector(bindAddress); - answer.setUseShutdownHook(false); - answer.setAdvisorySupport(false); - answer.setDedicatedTaskRunner(useDedicatedTaskRunner); - } - - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress); - factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner); - return factory; - } - - public static Test suite() { - return suite(DurableConsumerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java deleted file mode 100644 index ef24795..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; - -/** - * - */ -public class JMSDurableTopicNoLocalTest extends EmbeddedBrokerTestSupport { - - protected String bindAddress; - - public void testConsumeNoLocal() throws Exception { - final String TEST_NAME = getClass().getName(); - Connection connection = createConnection(); - connection.setClientID(TEST_NAME); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - TopicSubscriber subscriber = session.createDurableSubscriber((Topic) destination, "topicUser2", null, true); - - final CountDownLatch latch = new CountDownLatch(1); - subscriber.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - System.out.println("Receive a message " + message); - latch.countDown(); - } - }); - - connection.start(); - - MessageProducer producer = session.createProducer(destination); - TextMessage message = session.createTextMessage("THIS IS A TEST"); - producer.send(message); - producer.close(); - latch.await(5, TimeUnit.SECONDS); - assertEquals(latch.getCount(), 1); - } - - @Override - protected void setUp() throws Exception { - bindAddress = "vm://localhost"; - useTopic = true; - super.setUp(); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setUseJmx(false); - answer.setPersistent(true); - answer.setDeleteAllMessagesOnStartup(true); - answer.addConnector(bindAddress); - return answer; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java deleted file mode 100644 index 137caa3..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.Properties; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.test.JmsTopicSendReceiveTest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest { - - static final int NMSG = 200; - static final int MSIZE = 256000; - private static final transient Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class); - private static final String COUNT_PROPERY_NAME = "count"; - - protected Connection connection2; - protected Session session2; - protected Session consumeSession2; - protected MessageConsumer consumer2; - protected MessageProducer producer2; - protected Destination consumerDestination2; - BrokerService broker; - private Connection connection3; - private Session consumeSession3; - private TopicSubscriber consumer3; - - /** - * Set up a durable suscriber test. - * - * @see junit.framework.TestCase#setUp() - */ - @Override - protected void setUp() throws Exception { - this.durable = true; - broker = createBroker(); - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - broker.stop(); - } - - @Override - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("vm://localhost?async=false"); - Properties props = new Properties(); - props.put("prefetchPolicy.durableTopicPrefetch", "5"); - props.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5"); - result.setProperties(props); - return result; - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - configureBroker(answer); - answer.start(); - return answer; - } - - protected void configureBroker(BrokerService answer) throws Exception { - answer.setDeleteAllMessagesOnStartup(true); - } - - /** - * Test if all the messages sent are being received. - * - * @throws Exception - */ - public void testSlowReceiver() throws Exception { - connection2 = createConnection(); - connection2.setClientID("test"); - connection2.start(); - consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumerDestination2 = session2.createTopic(getConsumerSubject() + "2"); - consumer2 = consumeSession2.createDurableSubscriber((Topic) consumerDestination2, getName()); - - consumer2.close(); - connection2.close(); - new Thread(new Runnable() { - - @Override - public void run() { - try { - int count = 0; - for (int loop = 0; loop < 4; loop++) { - connection2 = createConnection(); - connection2.start(); - session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer2 = session2.createProducer(null); - producer2.setDeliveryMode(deliveryMode); - Thread.sleep(1000); - for (int i = 0; i < NMSG / 4; i++) { - BytesMessage message = session2.createBytesMessage(); - message.writeBytes(new byte[MSIZE]); - message.setStringProperty("test", "test"); - message.setIntProperty(COUNT_PROPERY_NAME, count); - message.setJMSType("test"); - producer2.send(consumerDestination2, message); - Thread.sleep(50); - if (verbose) { - LOG.debug("Sent(" + loop + "): " + i); - } - count++; - } - producer2.close(); - connection2.stop(); - connection2.close(); - } - } - catch (Throwable e) { - e.printStackTrace(); - } - } - }, "SENDER Thread").start(); - connection3 = createConnection(); - connection3.setClientID("test"); - connection3.start(); - consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer3 = consumeSession3.createDurableSubscriber((Topic) consumerDestination2, getName()); - connection3.close(); - int count = 0; - for (int loop = 0; loop < 4; ++loop) { - connection3 = createConnection(); - connection3.setClientID("test"); - connection3.start(); - consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer3 = consumeSession3.createDurableSubscriber((Topic) consumerDestination2, getName()); - Message msg = null; - int i; - for (i = 0; i < NMSG / 4; i++) { - msg = consumer3.receive(10000); - if (msg == null) { - break; - } - if (verbose) { - LOG.debug("Received(" + loop + "): " + i + " count = " + msg.getIntProperty(COUNT_PROPERY_NAME)); - } - assertNotNull(msg); - assertEquals(msg.getJMSType(), "test"); - assertEquals(msg.getStringProperty("test"), "test"); - assertEquals("Messages received out of order", count, msg.getIntProperty(COUNT_PROPERY_NAME)); - Thread.sleep(500); - msg.acknowledge(); - count++; - } - consumer3.close(); - assertEquals("Receiver " + loop, NMSG / 4, i); - connection3.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5d8f245e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java deleted file mode 100644 index 81987be..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.ResourceAllocationException; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.transport.RequestTimedOutIOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JmsTimeoutTest extends EmbeddedBrokerTestSupport { - - static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class); - - private final int messageSize = 1024 * 64; - private final int messageCount = 10000; - private final AtomicInteger exceptionCount = new AtomicInteger(0); - - /** - * Test the case where the broker is blocked due to a memory limit - * and a producer timeout is set on the connection. - * - * @throws Exception - */ - public void testBlockedProducerConnectionTimeout() throws Exception { - final ActiveMQConnection cx = (ActiveMQConnection) createConnection(); - final ActiveMQDestination queue = createDestination("testqueue"); - - // we should not take longer than 10 seconds to return from send - cx.setSendTimeout(10000); - - Runnable r = new Runnable() { - @Override - public void run() { - try { - LOG.info("Sender thread starting"); - Session session = cx.createSession(false, 1); - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - TextMessage message = session.createTextMessage(createMessageText()); - for (int count = 0; count < messageCount; count++) { - producer.send(message); - } - LOG.info("Done sending.."); - } - catch (JMSException e) { - if (e.getCause() instanceof RequestTimedOutIOException) { - exceptionCount.incrementAndGet(); - } - else { - e.printStackTrace(); - } - return; - } - - } - }; - cx.start(); - Thread producerThread = new Thread(r); - producerThread.start(); - producerThread.join(30000); - cx.close(); - // We should have a few timeout exceptions as memory store will fill up - assertTrue("No exception from the broker", exceptionCount.get() > 0); - } - - /** - * Test the case where the broker is blocked due to a memory limit - * with a fail timeout - * - * @throws Exception - */ - public void testBlockedProducerUsageSendFailTimeout() throws Exception { - final ActiveMQConnection cx = (ActiveMQConnection) createConnection(); - final ActiveMQDestination queue = createDestination("testqueue"); - - broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000); - Runnable r = new Runnable() { - @Override - public void run() { - try { - LOG.info("Sender thread starting"); - Session session = cx.createSession(false, 1); - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - TextMessage message = session.createTextMessage(createMessageText()); - for (int count = 0; count < messageCount; count++) { - producer.send(message); - } - LOG.info("Done sending.."); - } - catch (JMSException e) { - if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) { - exceptionCount.incrementAndGet(); - } - else { - e.printStackTrace(); - } - return; - } - } - }; - cx.start(); - Thread producerThread = new Thread(r); - producerThread.start(); - producerThread.join(30000); - cx.close(); - // We should have a few timeout exceptions as memory store will fill up - assertTrue("No exception from the broker", exceptionCount.get() > 0); - } - - @Override - protected void setUp() throws Exception { - exceptionCount.set(0); - bindAddress = "tcp://localhost:0"; - broker = createBroker(); - broker.setDeleteAllMessagesOnStartup(true); - broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024); - - super.setUp(); - } - - @Override - protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); - } - - private String createMessageText() { - StringBuffer buffer = new StringBuffer(); - buffer.append(""); - for (int i = buffer.length(); i < messageSize; i++) { - buffer.append('X'); - } - buffer.append(""); - return buffer.toString(); - } - -}