Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B5F719BB5 for ; Fri, 4 Mar 2016 22:42:47 +0000 (UTC) Received: (qmail 27403 invoked by uid 500); 4 Mar 2016 22:42:46 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 27224 invoked by uid 500); 4 Mar 2016 22:42:46 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 26395 invoked by uid 99); 4 Mar 2016 22:42:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 22:42:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03654E7944; Fri, 4 Mar 2016 22:42:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 04 Mar 2016 22:42:55 -0000 Message-Id: <73a629dea95545ff939a425f733c6833@git.apache.org> In-Reply-To: <64a7ec3b1ddf4b2f8d97fa76fd178cb9@git.apache.org> References: <64a7ec3b1ddf4b2f8d97fa76fd178cb9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/58] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java deleted file mode 100644 index 62723af..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; -import org.apache.activemq.broker.util.RedeliveryPlugin; -import org.apache.activemq.util.IOHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Testing if the the broker "sends" the message as expected after the redeliveryPlugin has redelivered the - * message previously. - */ - -public class RedeliveryPluginHeaderTest extends TestCase { - - private static final String TEST_QUEUE_ONE = "TEST_QUEUE_ONE"; - private static final String TEST_QUEUE_TWO = "TEST_QUEUE_TWO"; - private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginHeaderTest.class); - private String transportURL; - private BrokerService broker; - - /** - * Test - * - consumes message from Queue1 - * - rolls back message to Queue1 and message is scheduled for redelivery to Queue1 by brokers plugin - * - consumes message from Queue1 again - * - sends same message to Queue2 - * - expects to consume message from Queue2 immediately - */ - - public void testSendAfterRedelivery() throws Exception { - broker = this.createBroker(false); - broker.start(); - broker.waitUntilStarted(); - - LOG.info("***Broker started..."); - - //pushed message to broker - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0"); - - Connection connection = factory.createConnection(); - connection.start(); - - try { - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - Destination destinationQ1 = session.createQueue(TEST_QUEUE_ONE); - Destination destinationQ2 = session.createQueue(TEST_QUEUE_TWO); - - MessageProducer producerQ1 = session.createProducer(destinationQ1); - producerQ1.setDeliveryMode(DeliveryMode.PERSISTENT); - - Message m = session.createTextMessage("testMessage"); - LOG.info("*** send message to broker..."); - producerQ1.send(m); - session.commit(); - - //consume message from Q1 and rollback to get it redelivered - MessageConsumer consumerQ1 = session.createConsumer(destinationQ1); - - LOG.info("*** consume message from Q1 and rolled back.."); - - TextMessage textMessage = (TextMessage) consumerQ1.receive(); - LOG.info("got redelivered: " + textMessage); - assertFalse("JMSRedelivered flag is not set", textMessage.getJMSRedelivered()); - session.rollback(); - - LOG.info("*** consumed message from Q1 again and sending to Q2.."); - TextMessage textMessage2 = (TextMessage) consumerQ1.receive(); - LOG.info("got: " + textMessage2); - session.commit(); - assertTrue("JMSRedelivered flag is set", textMessage2.getJMSRedelivered()); - - //send message to Q2 and consume from Q2 - MessageConsumer consumerQ2 = session.createConsumer(destinationQ2); - MessageProducer producer_two = session.createProducer(destinationQ2); - producer_two.send(textMessage2); - session.commit(); - - //Message should be available straight away on the queue_two - Message textMessage3 = consumerQ2.receive(1000); - assertNotNull("should have consumed a message from TEST_QUEUE_TWO", textMessage3); - assertFalse("JMSRedelivered flag is not set", textMessage3.getJMSRedelivered()); - session.commit(); - - } - finally { - - if (connection != null) { - connection.close(); - } - - if (broker != null) { - broker.stop(); - } - - } - - } - - protected BrokerService createBroker(boolean withJMX) throws Exception { - File schedulerDirectory = new File("target/scheduler"); - IOHelper.mkdirs(schedulerDirectory); - IOHelper.deleteChildren(schedulerDirectory); - - BrokerService answer = new BrokerService(); - answer.setAdvisorySupport(false); - answer.setDataDirectory("target"); - answer.setSchedulerDirectoryFile(schedulerDirectory); - answer.setSchedulerSupport(true); - answer.setPersistent(true); - answer.setDeleteAllMessagesOnStartup(true); - answer.setUseJmx(withJMX); - - RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin(); - RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); - RedeliveryPolicy defaultEntry = new RedeliveryPolicy(); - defaultEntry.setInitialRedeliveryDelay(5000); - defaultEntry.setMaximumRedeliveries(5); - redeliveryPolicyMap.setDefaultEntry(defaultEntry); - redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap); - - answer.setPlugins(new BrokerPlugin[]{redeliveryPlugin}); - TransportConnector transportConnector = answer.addConnector("tcp://localhost:0"); - - transportURL = transportConnector.getConnectUri().toASCIIString(); - - return answer; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java deleted file mode 100644 index b4858c1..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SlowConsumerTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class); - private static final int MESSAGES_COUNT = 10000; - - private final int messageLogFrequency = 2500; - private final long messageReceiveTimeout = 10000L; - - private Socket stompSocket; - private ByteArrayOutputStream inputBuffer; - private int messagesCount; - - /** - * @param args - * @throws Exception - */ - public void testRemoveSubscriber() throws Exception { - final BrokerService broker = new BrokerService(); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setDeleteAllMessagesOnStartup(true); - - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); - final Connection connection = factory.createConnection(); - connection.start(); - - Thread producingThread = new Thread("Producing thread") { - @Override - public void run() { - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName())); - for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { - Message message = session.createTextMessage("" + idx); - producer.send(message); - LOG.debug("Sending: " + idx); - } - producer.close(); - session.close(); - } - catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - producingThread.setPriority(Thread.MAX_PRIORITY); - producingThread.start(); - Thread.sleep(1000); - - Thread consumingThread = new Thread("Consuming thread") { - - @Override - public void run() { - try { - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationName())); - int diff = 0; - while (messagesCount != MESSAGES_COUNT) { - Message msg = consumer.receive(messageReceiveTimeout); - if (msg == null) { - LOG.warn("Got null message at count: " + messagesCount + ". Continuing..."); - break; - } - String text = ((TextMessage) msg).getText(); - int currentMsgIdx = Integer.parseInt(text); - LOG.debug("Received: " + text + " messageCount: " + messagesCount); - msg.acknowledge(); - if ((messagesCount + diff) != currentMsgIdx) { - LOG.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx); - diff = currentMsgIdx - messagesCount; - } - ++messagesCount; - if (messagesCount % messageLogFrequency == 0) { - LOG.info("Received: " + messagesCount + " messages so far"); - } - // Thread.sleep(70); - } - } - catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - consumingThread.start(); - consumingThread.join(); - - assertEquals(MESSAGES_COUNT, messagesCount); - - } - - public void sendFrame(String data) throws Exception { - byte[] bytes = data.getBytes("UTF-8"); - OutputStream outputStream = stompSocket.getOutputStream(); - for (int i = 0; i < bytes.length; i++) { - outputStream.write(bytes[i]); - } - outputStream.flush(); - } - - public String receiveFrame(long timeOut) throws Exception { - stompSocket.setSoTimeout((int) timeOut); - InputStream is = stompSocket.getInputStream(); - int c = 0; - for (;;) { - c = is.read(); - if (c < 0) { - throw new IOException("socket closed."); - } - else if (c == 0) { - c = is.read(); - byte[] ba = inputBuffer.toByteArray(); - inputBuffer.reset(); - return new String(ba, "UTF-8"); - } - else { - inputBuffer.write(c); - } - } - } - - protected String getDestinationName() { - return getClass().getName() + "." + getName(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java deleted file mode 100644 index 3e22dc2..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.leveldb.LevelDBStore; - -public class SparseAckReplayAfterStoreCleanupLevelDBStoreTest extends AMQ2832Test { - - @Override - protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { - LevelDBStore store = new LevelDBStore(); - store.setFlushDelay(0); - brokerService.setPersistenceAdapter(store); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java deleted file mode 100644 index f521d40..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.junit.Test; - -/** - * Demonstrates how unmarshalled VM advisory messages for temporary queues prevent other connections from being closed. - */ -public class TempQueueDeleteOnCloseTest { - - @Test - public void test() throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); - - // create a connection and session with a temporary queue - Connection connectionA = connectionFactory.createConnection(); - connectionA.setClientID("ConnectionA"); - Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination tempQueueA = sessionA.createTemporaryQueue(); - MessageConsumer consumer = sessionA.createConsumer(tempQueueA); - connectionA.start(); - - // start and stop another connection - Connection connectionB = connectionFactory.createConnection(); - connectionB.setClientID("ConnectionB"); - connectionB.start(); - connectionB.close(); - - consumer.close(); - connectionA.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java deleted file mode 100644 index dc15f87..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ResourceAllocationException; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.kahadb.plist.PListStoreImpl; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.StoreUsage; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.usage.TempUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TempStorageBlockedBrokerTest extends TestSupport { - - public int deliveryMode = DeliveryMode.PERSISTENT; - - private static final Logger LOG = LoggerFactory.getLogger(TempStorageBlockedBrokerTest.class); - private static final int MESSAGES_COUNT = 1000; - private static byte[] buf = new byte[4 * 1024]; - private BrokerService broker; - AtomicInteger messagesSent = new AtomicInteger(0); - AtomicInteger messagesConsumed = new AtomicInteger(0); - - protected long messageReceiveTimeout = 10000L; - - Destination destination = new ActiveMQTopic("FooTwo"); - - private String connectionUri; - - public void testRunProducerWithHungConsumer() throws Exception { - - final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage(); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - // ensure messages are spooled to disk for this consumer - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setTopicPrefetch(10); - factory.setPrefetchPolicy(prefetch); - Connection consumerConnection = factory.createConnection(); - consumerConnection.start(); - - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(destination); - - final Connection producerConnection = factory.createConnection(); - producerConnection.start(); - - final CountDownLatch producerHasSentTenMessages = new CountDownLatch(10); - Thread producingThread = new Thread("Producing thread") { - @Override - public void run() { - try { - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { - Message message = session.createTextMessage(new String(buf) + idx); - - producer.send(message); - messagesSent.incrementAndGet(); - producerHasSentTenMessages.countDown(); - Thread.sleep(10); - if (idx != 0 && idx % 100 == 0) { - LOG.info("Sent Message " + idx); - LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); - } - } - producer.close(); - session.close(); - } - catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - producingThread.start(); - - assertTrue("producer has sent 10 in a reasonable time", producerHasSentTenMessages.await(30, TimeUnit.SECONDS)); - - int count = 0; - - Message m = null; - while ((m = consumer.receive(messageReceiveTimeout)) != null) { - count++; - if (count != 0 && count % 10 == 0) { - LOG.info("Received Message (" + count + "):" + m); - } - messagesConsumed.incrementAndGet(); - try { - Thread.sleep(100); - } - catch (Exception e) { - LOG.info("error sleeping"); - } - } - - LOG.info("Connection Timeout: Retrying.. count: " + count); - - while ((m = consumer.receive(messageReceiveTimeout)) != null) { - count++; - if (count != 0 && count % 100 == 0) { - LOG.info("Received Message (" + count + "):" + m); - } - messagesConsumed.incrementAndGet(); - try { - Thread.sleep(100); - } - catch (Exception e) { - LOG.info("error sleeping"); - } - } - - LOG.info("consumer session closing: consumed count: " + count); - - consumerSession.close(); - - producingThread.join(); - - final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage(); - LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription); - - producerConnection.close(); - consumerConnection.close(); - - LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage()); - - // do a cleanup - ((PListStoreImpl) broker.getTempDataStore()).run(); - LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage()); - - assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT); - assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), MESSAGES_COUNT); - } - - public void testFillTempAndConsume() throws Exception { - - broker.getSystemUsage().setSendFailIfNoSpace(true); - destination = new ActiveMQQueue("Foo"); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); - // so we can easily catch the ResourceAllocationException on send - producerConnection.setAlwaysSyncSend(true); - producerConnection.start(); - - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - try { - while (true) { - Message message = session.createTextMessage(new String(buf) + messagesSent.toString()); - producer.send(message); - messagesSent.incrementAndGet(); - if (messagesSent.get() % 100 == 0) { - LOG.info("Sent Message " + messagesSent.get()); - LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); - } - } - } - catch (ResourceAllocationException ex) { - LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get()); - } - - // consume all sent - Connection consumerConnection = factory.createConnection(); - consumerConnection.start(); - - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(destination); - - while (consumer.receive(messageReceiveTimeout) != null) { - messagesConsumed.incrementAndGet(); - if (messagesConsumed.get() % 1000 == 0) { - LOG.info("received Message " + messagesConsumed.get()); - LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); - } - } - - assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), messagesSent.get()); - } - - @Override - public void setUp() throws Exception { - - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - - setDefaultPersistenceAdapter(broker); - SystemUsage sysUsage = broker.getSystemUsage(); - MemoryUsage memUsage = new MemoryUsage(); - memUsage.setLimit((1024 * 1024)); - StoreUsage storeUsage = new StoreUsage(); - storeUsage.setLimit((1024 * 1024) * 38); - TempUsage tmpUsage = new TempUsage(); - tmpUsage.setLimit((1024 * 1024) * 38); - - PolicyEntry defaultPolicy = new PolicyEntry(); - // defaultPolicy.setTopic("FooTwo"); - defaultPolicy.setProducerFlowControl(false); - defaultPolicy.setMemoryLimit(10 * 1024); - - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(defaultPolicy); - - sysUsage.setMemoryUsage(memUsage); - sysUsage.setStoreUsage(storeUsage); - sysUsage.setTempUsage(tmpUsage); - - broker.setDestinationPolicy(policyMap); - broker.setSystemUsage(sysUsage); - - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - @Override - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java deleted file mode 100644 index d04cc3f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import java.io.File; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ResourceAllocationException; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.plist.PListStoreImpl; -import org.junit.After; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test that when configuring small temp store limits the journal size must also - * be smaller than the configured limit, but will still send a ResourceAllocationException - * if its not when sendFailIfNoSpace is enabled. - */ -public class TempStorageConfigBrokerTest { - - public int deliveryMode = DeliveryMode.PERSISTENT; - - private static final Logger LOG = LoggerFactory.getLogger(TempStorageConfigBrokerTest.class); - private static byte[] buf = new byte[4 * 1024]; - private BrokerService broker; - private AtomicInteger messagesSent = new AtomicInteger(0); - private AtomicInteger messagesConsumed = new AtomicInteger(0); - - private String brokerUri; - private long messageReceiveTimeout = 10000L; - private Destination destination = new ActiveMQTopic("FooTwo"); - - @Test(timeout = 360000) - @Ignore("blocks in hudson, needs investigation") - public void testFillTempAndConsumeWithBadTempStoreConfig() throws Exception { - - createBrokerWithInvalidTempStoreConfig(); - - broker.getSystemUsage().setSendFailIfNoSpace(true); - destination = new ActiveMQQueue("Foo"); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); - final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); - // so we can easily catch the ResourceAllocationException on send - producerConnection.setAlwaysSyncSend(true); - producerConnection.start(); - - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - try { - while (true) { - Message message = session.createTextMessage(new String(buf) + messagesSent.toString()); - producer.send(message); - messagesSent.incrementAndGet(); - if (messagesSent.get() % 100 == 0) { - LOG.info("Sent Message " + messagesSent.get()); - LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); - } - } - } - catch (ResourceAllocationException ex) { - assertTrue("Should not be able to send 100 messages: ", messagesSent.get() < 100); - LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get()); - } - } - - @Test(timeout = 360000) - @Ignore("blocks in hudson, needs investigation") - public void testFillTempAndConsumeWithGoodTempStoreConfig() throws Exception { - - createBrokerWithValidTempStoreConfig(); - - broker.getSystemUsage().setSendFailIfNoSpace(true); - destination = new ActiveMQQueue("Foo"); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); - final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); - // so we can easily catch the ResourceAllocationException on send - producerConnection.setAlwaysSyncSend(true); - producerConnection.start(); - - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - try { - while (true) { - Message message = session.createTextMessage(new String(buf) + messagesSent.toString()); - producer.send(message); - messagesSent.incrementAndGet(); - if (messagesSent.get() % 100 == 0) { - LOG.info("Sent Message " + messagesSent.get()); - LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); - } - } - } - catch (ResourceAllocationException ex) { - assertTrue("Should be able to send at least 200 messages but was: " + messagesSent.get(), messagesSent.get() > 200); - LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get()); - } - - // consume all sent - Connection consumerConnection = factory.createConnection(); - consumerConnection.start(); - - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(destination); - - while (consumer.receive(messageReceiveTimeout) != null) { - messagesConsumed.incrementAndGet(); - if (messagesConsumed.get() % 1000 == 0) { - LOG.info("received Message " + messagesConsumed.get()); - LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); - } - } - - assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), messagesSent.get()); - } - - private void createBrokerWithValidTempStoreConfig() throws Exception { - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistenceAdapter(new KahaDBPersistenceAdapter()); - - broker.getSystemUsage().setSendFailIfNoSpace(true); - broker.getSystemUsage().getMemoryUsage().setLimit(1048576); - broker.getSystemUsage().getTempUsage().setLimit(2 * 1048576); - ((PListStoreImpl) broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576); - broker.getSystemUsage().getStoreUsage().setLimit(20 * 1048576); - - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setProducerFlowControl(false); - defaultPolicy.setMemoryLimit(10 * 1024); - - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(defaultPolicy); - - broker.setDestinationPolicy(policyMap); - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - - brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - private void createBrokerWithInvalidTempStoreConfig() throws Exception { - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistenceAdapter(new KahaDBPersistenceAdapter()); - - broker.getSystemUsage().setSendFailIfNoSpace(true); - broker.getSystemUsage().getMemoryUsage().setLimit(1048576); - broker.getSystemUsage().getTempUsage().setLimit(2 * 1048576); - broker.getSystemUsage().getStoreUsage().setLimit(2 * 1048576); - - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setProducerFlowControl(false); - defaultPolicy.setMemoryLimit(10 * 1024); - - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(defaultPolicy); - - broker.setDestinationPolicy(policyMap); - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - - brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java deleted file mode 100644 index 8051a59..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.plist.PListStoreImpl; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TempStoreDataCleanupTest { - - private static final Logger LOG = LoggerFactory.getLogger(TempStoreDataCleanupTest.class); - private static final String QUEUE_NAME = TempStoreDataCleanupTest.class.getName() + "Queue"; - - private final String str = new String("QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR"); - - private BrokerService broker; - private String connectionUri; - private ExecutorService pool; - private String queueName; - private Random r = new Random(); - - @Before - public void setUp() throws Exception { - - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setDedicatedTaskRunner(false); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - - SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy(); - strategy.setProcessExpired(false); - strategy.setProcessNonPersistent(false); - - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setQueue(">"); - defaultPolicy.setOptimizedDispatch(true); - defaultPolicy.setDeadLetterStrategy(strategy); - defaultPolicy.setMemoryLimit(9000000); - - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(defaultPolicy); - - broker.setDestinationPolicy(policyMap); - - broker.getSystemUsage().getMemoryUsage().setLimit(300000000L); - - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - pool = Executors.newFixedThreadPool(10); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - - if (pool != null) { - pool.shutdown(); - } - } - - @Test - public void testIt() throws Exception { - - int startPercentage = broker.getAdminView().getMemoryPercentUsage(); - LOG.info("MemoryUsage at test start = " + startPercentage); - - for (int i = 0; i < 2; i++) { - LOG.info("Started the test iteration: " + i + " using queueName = " + queueName); - queueName = QUEUE_NAME + i; - final CountDownLatch latch = new CountDownLatch(11); - - pool.execute(new Runnable() { - @Override - public void run() { - receiveAndDiscard100messages(latch); - } - }); - - for (int j = 0; j < 10; j++) { - pool.execute(new Runnable() { - @Override - public void run() { - send10000messages(latch); - } - }); - } - - LOG.info("Waiting on the send / receive latch"); - latch.await(5, TimeUnit.MINUTES); - LOG.info("Resumed"); - - destroyQueue(); - TimeUnit.SECONDS.sleep(2); - } - - LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage()); - - final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore(); - assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return pa.getJournal().getFileMap().size() == 1; - } - }, TimeUnit.MINUTES.toMillis(3))); - - int endPercentage = broker.getAdminView().getMemoryPercentUsage(); - LOG.info("MemoryUsage at test end = " + endPercentage); - - assertEquals(startPercentage, endPercentage); - } - - public void destroyQueue() { - try { - Broker broker = this.broker.getBroker(); - if (!broker.isStopped()) { - LOG.info("Removing: " + queueName); - broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10); - } - } - catch (Exception e) { - LOG.warn("Got an error while removing the test queue", e); - } - } - - private void send10000messages(CountDownLatch latch) { - ActiveMQConnection activeMQConnection = null; - try { - activeMQConnection = createConnection(null); - Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(queueName)); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - activeMQConnection.start(); - for (int i = 0; i < 10000; i++) { - TextMessage textMessage = session.createTextMessage(); - textMessage.setText(generateBody(1000)); - textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(textMessage); - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - } - } - producer.close(); - } - catch (JMSException e) { - LOG.warn("Got an error while sending the messages", e); - } - finally { - if (activeMQConnection != null) { - try { - activeMQConnection.close(); - } - catch (JMSException e) { - } - } - } - latch.countDown(); - } - - private void receiveAndDiscard100messages(CountDownLatch latch) { - ActiveMQConnection activeMQConnection = null; - try { - activeMQConnection = createConnection(null); - Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName)); - activeMQConnection.start(); - for (int i = 0; i < 100; i++) { - messageConsumer.receive(); - } - messageConsumer.close(); - LOG.info("Created and disconnected"); - } - catch (JMSException e) { - LOG.warn("Got an error while receiving the messages", e); - } - finally { - if (activeMQConnection != null) { - try { - activeMQConnection.close(); - } - catch (JMSException e) { - } - } - } - latch.countDown(); - } - - private ActiveMQConnection createConnection(String id) throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - if (id != null) { - factory.setClientID(id); - } - - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - return connection; - } - - private String generateBody(int length) { - - StringBuilder sb = new StringBuilder(); - int te = 0; - for (int i = 1; i <= length; i++) { - te = r.nextInt(62); - sb.append(str.charAt(te)); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java deleted file mode 100644 index db3888a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertTrue; - -// https://issues.apache.org/jira/browse/AMQ-4262 -public class TransactedStoreUsageSuspendResumeTest { - - private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class); - - private static final int MAX_MESSAGES = 10000; - - private static final String QUEUE_NAME = "test.queue"; - - private BrokerService broker; - - private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES); - private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES); - private final CountDownLatch consumerStartLatch = new CountDownLatch(1); - - private class ConsumerThread extends Thread { - - @Override - public void run() { - try { - - consumerStartLatch.await(30, TimeUnit.SECONDS); - - ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - // wait for producer to stop - long currentSendCount; - do { - currentSendCount = messagesSentCountDown.getCount(); - TimeUnit.SECONDS.sleep(5); - } while (currentSendCount != messagesSentCountDown.getCount()); - - LOG.info("Starting consumer at: " + currentSendCount); - - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); - - do { - Message message = consumer.receive(5000); - if (message != null) { - session.commit(); - messagesReceivedCountDown.countDown(); - } - if (messagesReceivedCountDown.getCount() % 500 == 0) { - LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount()); - } - } while (messagesReceivedCountDown.getCount() != 0); - consumer.close(); - session.close(); - connection.close(); - } - catch (Exception e) { - Assert.fail(e.getMessage()); - } - } - } - - @Before - public void setup() throws Exception { - - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(true); - - KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter(); - kahaDB.setJournalMaxFileLength(500 * 1024); - kahaDB.setCleanupInterval(10 * 1000); - broker.setPersistenceAdapter(kahaDB); - - broker.getSystemUsage().getStoreUsage().setLimit(7 * 1024 * 1024); - - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - @Test - public void testTransactedStoreUsageSuspendResume() throws Exception { - - ConsumerThread thread = new ConsumerThread(); - thread.start(); - ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); - sendExecutor.execute(new Runnable() { - @Override - public void run() { - try { - sendMessages(); - } - catch (Exception ignored) { - } - } - }); - sendExecutor.shutdown(); - sendExecutor.awaitTermination(5, TimeUnit.MINUTES); - - boolean allMessagesReceived = messagesReceivedCountDown.await(10, TimeUnit.MINUTES); - if (!allMessagesReceived) { - AutoFailTestSupport.dumpAllThreads("StuckConsumer!"); - } - assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived); - - // give consumers a chance to exit gracefully - TimeUnit.SECONDS.sleep(2); - } - - private void sendMessages() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - factory.setAlwaysSyncSend(true); - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination queue = session.createQueue(QUEUE_NAME); - Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain"); - MessageProducer producer = session.createProducer(null); - - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[10]); - - for (int i = 0; i < 4240; i++) { - // mostly fill the store with retained messages - // so consumer only has a small bit of store usage to work with - producer.send(retainQueue, message); - session.commit(); - } - - consumerStartLatch.countDown(); - for (int i = 0; i < MAX_MESSAGES; i++) { - producer.send(queue, message); - if (i > 0 && i % 20 == 0) { - session.commit(); - } - messagesSentCountDown.countDown(); - if (i > 0 && i % 500 == 0) { - LOG.info("Sent : " + i); - } - - } - session.commit(); - producer.close(); - session.close(); - connection.close(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java deleted file mode 100644 index 2038279..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* - * simulate message flow which cause the following exception in the broker - * (exception logged by client)
2007-07-24 13:51:23,624 - * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure - * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' - * has not been started. at - * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230) - * This appears to be consistent in a MacBook. Haven't been able to replicate it - * on Windows though - */ -public class TransactionNotStartedErrorTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class); - - private static final int counter = 500; - - private static int hectorToHaloCtr; - private static int xenaToHaloCtr; - private static int troyToHaloCtr; - - private static int haloToHectorCtr; - private static int haloToXenaCtr; - private static int haloToTroyCtr; - - private final String hectorToHalo = "hectorToHalo"; - private final String xenaToHalo = "xenaToHalo"; - private final String troyToHalo = "troyToHalo"; - - private final String haloToHector = "haloToHector"; - private final String haloToXena = "haloToXena"; - private final String haloToTroy = "haloToTroy"; - - private BrokerService broker; - - private Connection hectorConnection; - private Connection xenaConnection; - private Connection troyConnection; - private Connection haloConnection; - - private final Object lock = new Object(); - - public Connection createConnection() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); - return factory.createConnection(); - } - - public Session createSession(Connection connection, boolean transacted) throws JMSException { - return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - } - - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - LOG.info("Starting broker.."); - } - - @Override - public void tearDown() throws Exception { - hectorConnection.close(); - xenaConnection.close(); - troyConnection.close(); - haloConnection.close(); - broker.stop(); - } - - public void testTransactionNotStartedError() throws Exception { - startBroker(); - hectorConnection = createConnection(); - Thread hectorThread = buildProducer(hectorConnection, hectorToHalo); - Receiver hHectorReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - haloToHectorCtr++; - if (haloToHectorCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver); - - troyConnection = createConnection(); - Thread troyThread = buildProducer(troyConnection, troyToHalo); - Receiver hTroyReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - haloToTroyCtr++; - if (haloToTroyCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver); - - xenaConnection = createConnection(); - Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); - Receiver hXenaReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - haloToXenaCtr++; - if (haloToXenaCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver); - - haloConnection = createConnection(); - final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection); - final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection); - final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection); - Receiver hectorReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - hectorToHaloCtr++; - troySender.send("halo to troy because of hector"); - if (hectorToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - Receiver xenaReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - xenaToHaloCtr++; - hectorSender.send("halo to hector because of xena"); - if (xenaToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - Receiver troyReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - troyToHaloCtr++; - xenaSender.send("halo to xena because of troy"); - if (troyToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver); - buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver); - buildReceiver(haloConnection, troyToHalo, true, troyReceiver); - - haloConnection.start(); - - troyConnection.start(); - troyThread.start(); - - xenaConnection.start(); - xenaThread.start(); - - hectorConnection.start(); - hectorThread.start(); - waitForMessagesToBeDelivered(); - // number of messages received should match messages sent - assertEquals(hectorToHaloCtr, counter); - LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages"); - assertEquals(xenaToHaloCtr, counter); - LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages"); - assertEquals(troyToHaloCtr, counter); - LOG.info("troyToHalo received " + troyToHaloCtr + " messages"); - assertEquals(haloToHectorCtr, counter); - LOG.info("haloToHector received " + haloToHectorCtr + " messages"); - assertEquals(haloToXenaCtr, counter); - LOG.info("haloToXena received " + haloToXenaCtr + " messages"); - assertEquals(haloToTroyCtr, counter); - LOG.info("haloToTroy received " + haloToTroyCtr + " messages"); - - } - - protected void waitForMessagesToBeDelivered() { - // let's give the listeners enough time to read all messages - long maxWaitTime = counter * 3000; - long waitTime = maxWaitTime; - long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); - - synchronized (lock) { - boolean hasMessages = true; - while (hasMessages && waitTime >= 0) { - try { - lock.wait(200); - } - catch (InterruptedException e) { - LOG.error(e.toString()); - } - // check if all messages have been received - hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter || haloToTroyCtr < counter; - waitTime = maxWaitTime - (System.currentTimeMillis() - start); - } - } - } - - public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception { - return new MessageSender(queueName, connection, true, false); - } - - public Thread buildProducer(Connection connection, final String queueName) throws Exception { - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageSender producer = new MessageSender(queueName, connection, false, false); - Thread thread = new Thread() { - - @Override - public synchronized void run() { - for (int i = 0; i < counter; i++) { - try { - producer.send(queueName); - if (session.getTransacted()) { - session.commit(); - } - - } - catch (Exception e) { - throw new RuntimeException("on " + queueName + " send", e); - } - } - } - }; - return thread; - } - - public void buildReceiver(Connection connection, - final String queueName, - boolean transacted, - final Receiver receiver) throws Exception { - final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName)); - MessageListener messageListener = new MessageListener() { - - @Override - public void onMessage(Message message) { - try { - ObjectMessage objectMessage = (ObjectMessage) message; - String s = (String) objectMessage.getObject(); - receiver.receive(s); - if (session.getTransacted()) { - session.commit(); - } - - } - catch (Exception e) { - e.printStackTrace(); - } - } - }; - inputMessageConsumer.setMessageListener(messageListener); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java deleted file mode 100644 index 67b284f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.jdbc.DataSourceServiceSupport; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; -import org.apache.activemq.store.jdbc.TransactionContext; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.LeaseLockerIOExceptionHandler; -import org.apache.derby.jdbc.EmbeddedDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test to demostrate a message trapped in the JDBC store and not - * delivered to consumer - * - * The test throws issues the commit to the DB but throws - * an exception back to the broker. This scenario could happen when a network - * cable is disconnected - message is committed to DB but broker does not know. - */ - -public class TrapMessageInJDBCStoreTest extends TestCase { - - private static final String MY_TEST_Q = "MY_TEST_Q"; - private static final Logger LOG = LoggerFactory.getLogger(TrapMessageInJDBCStoreTest.class); - private String transportUrl = "tcp://127.0.0.1:0"; - private BrokerService broker; - private TestTransactionContext testTransactionContext; - private TestJDBCPersistenceAdapter jdbc; - - protected BrokerService createBroker(boolean withJMX) throws Exception { - BrokerService broker = new BrokerService(); - - broker.setUseJmx(withJMX); - - EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); - embeddedDataSource.setCreateDatabase("create"); - - //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch() - // method that can be configured to throw a SQL exception on demand - jdbc = new TestJDBCPersistenceAdapter(); - jdbc.setDataSource(embeddedDataSource); - jdbc.setCleanupPeriod(0); - testTransactionContext = new TestTransactionContext(jdbc); - - jdbc.setLockKeepAlivePeriod(1000L); - LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); - jdbc.setLocker(leaseDatabaseLocker); - - broker.setPersistenceAdapter(jdbc); - - broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler()); - - transportUrl = broker.addConnector(transportUrl).getPublishableConnectString(); - return broker; - } - - /** - * sends 3 messages to the queue. When the second message is being committed to the JDBCStore, $ - * it throws a dummy SQL exception - the message has been committed to the embedded DB before the exception - * is thrown - * - * Excepted correct outcome: receive 3 messages and the DB should contain no messages - * - * @throws Exception - */ - - public void testDBCommitException() throws Exception { - - broker = this.createBroker(false); - broker.deleteAllMessages(); - broker.start(); - broker.waitUntilStarted(); - - LOG.info("***Broker started..."); - - // failover but timeout in 5 seconds so the test does not hang - String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=5000"; - - sendMessage(MY_TEST_Q, failoverTransportURL); - - //check db contents - ArrayList dbSeq = dbMessageCount(); - LOG.info("*** after send: db contains message seq " + dbSeq); - - List consumedMessages = consumeMessages(MY_TEST_Q, failoverTransportURL); - - assertEquals("number of consumed messages", 3, consumedMessages.size()); - - //check db contents - dbSeq = dbMessageCount(); - LOG.info("*** after consume - db contains message seq " + dbSeq); - - assertEquals("number of messages in DB after test", 0, dbSeq.size()); - - broker.stop(); - broker.waitUntilStopped(); - } - - public List consumeMessages(String queue, String transportURL) throws JMSException { - Connection connection = null; - LOG.debug("*** consumeMessages() called ..."); - - try { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL); - - connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(queue); - - ArrayList consumedMessages = new ArrayList<>(); - - MessageConsumer messageConsumer = session.createConsumer(destination); - - while (true) { - TextMessage textMessage = (TextMessage) messageConsumer.receive(4000); - LOG.debug("*** consumed Messages :" + textMessage); - - if (textMessage == null) { - return consumedMessages; - } - consumedMessages.add(textMessage); - } - - } - finally { - if (connection != null) { - connection.close(); - } - } - } - - public void sendMessage(String queue, String transportURL) throws Exception { - Connection connection = null; - - try { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL); - - connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(queue); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - TextMessage m = session.createTextMessage("1"); - - LOG.debug("*** send message 1 to broker..."); - producer.send(m); - - // trigger SQL exception in transactionContext - LOG.debug("*** send message 2 to broker"); - m.setText("2"); - producer.send(m); - - //check db contents - ArrayList dbSeq = dbMessageCount(); - LOG.info("*** after send 2 - db contains message seq " + dbSeq); - assertEquals("number of messages in DB after send 2", 2, dbSeq.size()); - - LOG.debug("*** send message 3 to broker"); - m.setText("3"); - producer.send(m); - LOG.debug("*** Finished sending messages to broker"); - - } - finally { - if (connection != null) { - connection.close(); - } - } - } - - /** - * query the DB to see what messages are left in the store - * - * @return - * @throws SQLException - * @throws IOException - */ - private ArrayList dbMessageCount() throws SQLException, IOException { - java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection(); - PreparedStatement statement = conn.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS"); - - try { - - ResultSet result = statement.executeQuery(); - ArrayList dbSeq = new ArrayList<>(); - - while (result.next()) { - dbSeq.add(result.getLong(1)); - } - - return dbSeq; - - } - finally { - statement.close(); - conn.close(); - - } - - } - - /* - * Mock classes used for testing - */ - - public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter { - - @Override - public TransactionContext getTransactionContext() throws IOException { - return testTransactionContext; - } - } - - public class TestTransactionContext extends TransactionContext { - - private int count; - - public TestTransactionContext(JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException { - super(jdbcPersistenceAdapter); - } - - @Override - public void executeBatch() throws SQLException { - super.executeBatch(); - count++; - LOG.debug("ExecuteBatchOverride: count:" + count, new RuntimeException("executeBatch")); - - // throw on second add message - if (count == 16) { - throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + count); - } - } - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java deleted file mode 100644 index 84c1765..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.io.IOException; - -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.TransportListener; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -public class VMTransportClosureTest extends EmbeddedBrokerTestSupport { - - private static final Log LOG = LogFactory.getLog(VMTransportClosureTest.class); - private static final long MAX_TEST_TIME_MILLIS = 300000; // 5min - private static final int NUM_ATTEMPTS = 100000; - - @Override - public void setUp() throws Exception { - setAutoFail(true); - setMaxTestTime(MAX_TEST_TIME_MILLIS); - super.setUp(); - } - - /** - * EmbeddedBrokerTestSupport.createBroker() binds the broker to a VM - * transport address, which results in a call to - * VMTransportFactory.doBind(location): - *

- * - * public TransportServer doBind(URI location) throws IOException { - * return bind(location, false); - * } - * - *

- * As a result, VMTransportServer.disposeOnDisconnect is false. - * To expose the bug, we need to have VMTransportServer.disposeOnDisconnect - * true, which is the case when the VMTransportServer is not - * already bound when the first connection is made. - */ - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setPersistent(isPersistent()); - // answer.addConnector(bindAddress); - return answer; - } - - /** - * This test demonstrates how the "disposeOnDisonnect" feature of - * VMTransportServer can incorrectly close all VM connections to the local - * broker. - */ - public void testPrematureClosure() throws Exception { - - // Open a persistent connection to the local broker. The persistent - // connection is maintained through the test and should prevent the - // VMTransportServer from stopping itself when the local transport is - // closed. - ActiveMQConnection persistentConn = (ActiveMQConnection) createConnection(); - persistentConn.start(); - Session session = persistentConn.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = session.createProducer(destination); - - for (int i = 0; i < NUM_ATTEMPTS; i++) { - LOG.info("Attempt: " + i); - - // Open and close a local transport connection. As is done by by - // most users of the transport, ensure that the transport is stopped - // when closed by the peer (via ShutdownInfo). Closing the local - // transport should not affect the persistent connection. - final Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI()); - localTransport.setTransportListener(new TransportListener() { - @Override - public void onCommand(Object command) { - if (command instanceof ShutdownInfo) { - try { - localTransport.stop(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - } - - @Override - public void onException(IOException error) { - // ignore - } - - @Override - public void transportInterupted() { - // ignore - } - - @Override - public void transportResumed() { - // ignore - } - }); - - localTransport.start(); - localTransport.stop(); - - // Ensure that the persistent connection is still usable. - producer.send(session.createMessage()); - session.rollback(); - } - - persistentConn.close(); - } -}