Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8273D18142 for ; Thu, 4 Feb 2016 16:34:28 +0000 (UTC) Received: (qmail 84443 invoked by uid 500); 4 Feb 2016 16:34:06 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 84361 invoked by uid 500); 4 Feb 2016 16:34:06 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 83109 invoked by uid 99); 4 Feb 2016 16:34:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2016 16:34:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86106E0B28; Thu, 4 Feb 2016 16:34:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 04 Feb 2016 16:34:37 -0000 Message-Id: In-Reply-To: <90e1a43d4c35429fb64503b8e95aa7ca@git.apache.org> References: <90e1a43d4c35429fb64503b8e95aa7ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/36] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/491cca7d/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java new file mode 100755 index 0000000..dfcf302 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java @@ -0,0 +1,721 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.test.JmsResourceProvider; +import org.apache.activemq.test.TestSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener { + + private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class); + private static final int MESSAGE_COUNT = 5; + private static final String MESSAGE_TEXT = "message"; + + protected ConnectionFactory connectionFactory; + protected Connection connection; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected JmsResourceProvider resourceProvider; + protected Destination destination; + protected int batchCount = 10; + protected int batchSize = 20; + protected BrokerService broker; + + // for message listener test + private final List unackMessages = new ArrayList(MESSAGE_COUNT); + private final List ackMessages = new ArrayList(MESSAGE_COUNT); + private boolean resendPhase; + + public JmsTransactionTestSupport() { + super(); + } + + public JmsTransactionTestSupport(String name) { + super(name); + } + + /* + * (non-Javadoc) + * + * @see junit.framework.TestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + resourceProvider = getJmsResourceProvider(); + topic = resourceProvider.isTopic(); + // We will be using transacted sessions. + setSessionTransacted(); + connectionFactory = newConnectionFactory(); + reconnect(); + } + + protected void setSessionTransacted() { + resourceProvider.setTransacted(true); + } + + protected ConnectionFactory newConnectionFactory() throws Exception { + return resourceProvider.createConnectionFactory(); + } + + protected void beginTx() throws Exception { + //no-op for local tx + } + + protected void commitTx() throws Exception { + session.commit(); + } + + protected void rollbackTx() throws Exception { + session.rollback(); + } + + /** + */ + protected BrokerService createBroker() throws Exception, URISyntaxException { + return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + } + + /* + * (non-Javadoc) + * + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + LOG.info("Closing down connection"); + + try { + session.close(); + session = null; + connection.close(); + connection = null; + } catch (Exception e) { + LOG.info("Caught exception while closing resources."); + } + + try { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } catch (Exception e) { + LOG.info("Caught exception while shutting down the Broker", e); + } + + LOG.info("Connection closed."); + } + + protected abstract JmsResourceProvider getJmsResourceProvider(); + + /** + * Sends a batch of messages and validates that the messages are received. + * + * @throws Exception + */ + public void testSendReceiveTransactedBatches() throws Exception { + + TextMessage message = session.createTextMessage("Batch Message"); + for (int j = 0; j < batchCount; j++) { + LOG.info("Producing bacth " + j + " of " + batchSize + " messages"); + + beginTx(); + for (int i = 0; i < batchSize; i++) { + producer.send(message); + } + messageSent(); + commitTx(); + LOG.info("Consuming bacth " + j + " of " + batchSize + " messages"); + + beginTx(); + for (int i = 0; i < batchSize; i++) { + message = (TextMessage)consumer.receive(1000 * 5); + assertNotNull("Received only " + i + " messages in batch " + j, message); + assertEquals("Batch Message", message.getText()); + } + + commitTx(); + } + } + + protected void messageSent() throws Exception { + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. + * + * @throws Exception + */ + public void testSendRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + rollbackTx(); + + // sends a message + beginTx(); + producer.send(outbound[1]); + commitTx(); + + // receives the first message + beginTx(); + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * spec section 3.6 acking a message with automation acks has no effect. + * @throws Exception + */ + public void testAckMessageInTx() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + outbound[0].acknowledge(); + commitTx(); + outbound[0].acknowledge(); + + // receives the first message + beginTx(); + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Message not delivered.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the message sent before + * session close is not consumed. + * + * This test only works with local transactions, not xa. + * @throws Exception + */ + public void testSendSessionClose() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + consumer.close(); + + reconnectSession(); + + // sends a message + producer.send(outbound[1]); + commitTx(); + + // receives the first message + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the message sent before + * session close is not consumed. + * + * @throws Exception + */ + public void testSendSessionAndConnectionClose() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + consumer.close(); + session.close(); + + reconnect(); + + // sends a message + beginTx(); + producer.send(outbound[1]); + commitTx(); + + // receives the first message + ArrayList messages = new ArrayList(); + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * redelivered. + * + * @throws Exception + */ + public void testReceiveRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + // sent both messages + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + assertEquals(outbound[0], message); + commitTx(); + + // rollback so we can get that last message again. + beginTx(); + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(outbound[1], message); + rollbackTx(); + + // Consume again.. the prev message should + // get redelivered. + beginTx(); + message = consumer.receive(5000); + assertNotNull("Should have re-received the message again!", message); + messages.add(message); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * redelivered. + * + * @throws Exception + */ + public void testReceiveTwoThenRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + // + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + beginTx(); + Message message = consumer.receive(1000); + assertEquals(outbound[0], message); + + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(outbound[1], message); + rollbackTx(); + + // Consume again.. the prev message should + // get redelivered. + beginTx(); + message = consumer.receive(5000); + assertNotNull("Should have re-received the first message again!", message); + messages.add(message); + assertEquals(outbound[0], message); + message = consumer.receive(5000); + assertNotNull("Should have re-received the second message again!", message); + messages.add(message); + assertEquals(outbound[1], message); + + assertNull(consumer.receiveNoWait()); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. + * + * @throws Exception + */ + public void testSendReceiveWithPrefetchOne() throws Exception { + setPrefetchToOne(); + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"), + session.createTextMessage("Fourth Message")}; + + beginTx(); + for (int i = 0; i < outbound.length; i++) { + // sends a message + producer.send(outbound[i]); + } + commitTx(); + + // receives the first message + beginTx(); + for (int i = 0; i < outbound.length; i++) { + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + assertNotNull(message); + LOG.info("Received: " + message); + } + + // validates that the rollbacked was not consumed + commitTx(); + } + + /** + * Perform the test that validates if the rollbacked message was redelivered + * multiple times. + * + * @throws Exception + */ + public void testReceiveTwoThenRollbackManyTimes() throws Exception { + for (int i = 0; i < 5; i++) { + testReceiveTwoThenRollback(); + } + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. This test differs by setting the message prefetch to one. + * + * @throws Exception + */ + public void testSendRollbackWithPrefetchOfOne() throws Exception { + setPrefetchToOne(); + testSendRollback(); + } + + /** + * Sends a batch of messages and and validates that the rollbacked message + * was redelivered. This test differs by setting the message prefetch to + * one. + * + * @throws Exception + */ + public void testReceiveRollbackWithPrefetchOfOne() throws Exception { + setPrefetchToOne(); + testReceiveRollback(); + } + + /** + * Tests if the messages can still be received if the consumer is closed + * (session is not closed). + * + * @throws Exception see http://jira.codehaus.org/browse/AMQ-143 + */ + public void testCloseConsumerBeforeCommit() throws Exception { + TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receiveNoWait() != null) { + } + + commitTx(); + + // sends the messages + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + beginTx(); + TextMessage message = (TextMessage)consumer.receive(1000); + assertEquals(outbound[0].getText(), message.getText()); + // Close the consumer before the commit. This should not cause the + // received message + // to rollback. + consumer.close(); + commitTx(); + + // Create a new consumer + consumer = resourceProvider.createConsumer(session, destination); + LOG.info("Created consumer: " + consumer); + + beginTx(); + message = (TextMessage)consumer.receive(1000); + assertEquals(outbound[1].getText(), message.getText()); + commitTx(); + } + + public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception { + ArrayList list = new ArrayList(); + list.add("First"); + Message outbound = session.createObjectMessage(list); + outbound.setStringProperty("foo", "abc"); + + beginTx(); + producer.send(outbound); + commitTx(); + + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(5000); + + List body = assertReceivedObjectMessageWithListBody(message); + + // now lets try mutate it + try { + message.setStringProperty("foo", "def"); + fail("Cannot change properties of the object!"); + } catch (JMSException e) { + LOG.info("Caught expected exception: " + e, e); + } + body.clear(); + body.add("This should never be seen!"); + rollbackTx(); + + beginTx(); + message = consumer.receive(5000); + List secondBody = assertReceivedObjectMessageWithListBody(message); + assertNotSame("Second call should return a different body", secondBody, body); + commitTx(); + } + + @SuppressWarnings("unchecked") + protected List assertReceivedObjectMessageWithListBody(Message message) throws JMSException { + assertNotNull("Should have received a message!", message); + assertEquals("foo header", "abc", message.getStringProperty("foo")); + + assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage); + ObjectMessage objectMessage = (ObjectMessage)message; + List body = (List)objectMessage.getObject(); + LOG.info("Received body: " + body); + + assertEquals("Size of list should be 1", 1, body.size()); + assertEquals("element 0 of list", "First", body.get(0)); + return body; + } + + /** + * Recreates the connection. + * + * @throws javax.jms.JMSException + */ + protected void reconnect() throws Exception { + + if (connection != null) { + // Close the prev connection. + connection.close(); + } + session = null; + connection = resourceProvider.createConnection(connectionFactory); + reconnectSession(); + connection.start(); + } + + /** + * Recreates the connection. + * + * @throws javax.jms.JMSException + */ + protected void reconnectSession() throws JMSException { + if (session != null) { + session.close(); + } + + session = resourceProvider.createSession(connection); + destination = resourceProvider.createDestination(session, getSubject()); + producer = resourceProvider.createProducer(session, destination); + consumer = resourceProvider.createConsumer(session, destination); + } + + /** + * Sets the prefeftch policy to one. + */ + protected void setPrefetchToOne() { + ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy(); + prefetchPolicy.setQueuePrefetch(1); + prefetchPolicy.setTopicPrefetch(1); + prefetchPolicy.setDurableTopicPrefetch(1); + prefetchPolicy.setOptimizeDurableTopicPrefetch(1); + } + + protected ActiveMQPrefetchPolicy getPrefetchPolicy() { + return ((ActiveMQConnection)connection).getPrefetchPolicy(); + } + + //This test won't work with xa tx so no beginTx() has been added. + public void testMessageListener() throws Exception { + // send messages + for (int i = 0; i < MESSAGE_COUNT; i++) { + producer.send(session.createTextMessage(MESSAGE_TEXT + i)); + } + commitTx(); + consumer.setMessageListener(this); + // wait receive + waitReceiveUnack(); + assertEquals(unackMessages.size(), MESSAGE_COUNT); + // resend phase + waitReceiveAck(); + assertEquals(ackMessages.size(), MESSAGE_COUNT); + // should no longer re-receive + consumer.setMessageListener(null); + assertNull(consumer.receive(500)); + reconnect(); + } + + @Override + public void onMessage(Message message) { + if (!resendPhase) { + unackMessages.add(message); + if (unackMessages.size() == MESSAGE_COUNT) { + try { + rollbackTx(); + resendPhase = true; + } catch (Exception e) { + e.printStackTrace(); + } + } + } else { + ackMessages.add(message); + if (ackMessages.size() == MESSAGE_COUNT) { + try { + commitTx(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + private void waitReceiveUnack() throws Exception { + for (int i = 0; i < 100 && !resendPhase; i++) { + Thread.sleep(100); + } + assertTrue(resendPhase); + } + + private void waitReceiveAck() throws Exception { + for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) { + Thread.sleep(100); + } + assertFalse(ackMessages.size() < MESSAGE_COUNT); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/491cca7d/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java deleted file mode 100644 index 37899e8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Destination; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author rnewson - */ -public final class LargeStreamletTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class); - private static final String BROKER_URL = "vm://localhost?broker.persistent=false"; - private static final int BUFFER_SIZE = 1 * 1024; - private static final int MESSAGE_COUNT = 10 * 1024; - - protected Exception writerException; - protected Exception readerException; - - private final AtomicInteger totalRead = new AtomicInteger(); - private final AtomicInteger totalWritten = new AtomicInteger(); - private final AtomicBoolean stopThreads = new AtomicBoolean(false); - - public void testStreamlets() throws Exception { - final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); - - final ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - connection.start(); - try { - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - final Destination destination = session.createQueue("wibble"); - final Thread readerThread = new Thread(new Runnable() { - - @Override - public void run() { - totalRead.set(0); - try { - final InputStream inputStream = connection.createInputStream(destination); - try { - int read; - final byte[] buf = new byte[BUFFER_SIZE]; - while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) { - totalRead.addAndGet(read); - } - } - finally { - inputStream.close(); - } - } - catch (Exception e) { - readerException = e; - e.printStackTrace(); - } - finally { - LOG.info(totalRead + " total bytes read."); - } - } - }); - - final Thread writerThread = new Thread(new Runnable() { - private final Random random = new Random(); - - @Override - public void run() { - totalWritten.set(0); - int count = MESSAGE_COUNT; - try { - final OutputStream outputStream = connection.createOutputStream(destination); - try { - final byte[] buf = new byte[BUFFER_SIZE]; - random.nextBytes(buf); - while (count > 0 && !stopThreads.get()) { - outputStream.write(buf); - totalWritten.addAndGet(buf.length); - count--; - } - } - finally { - outputStream.close(); - } - } - catch (Exception e) { - writerException = e; - e.printStackTrace(); - } - finally { - LOG.info(totalWritten + " total bytes written."); - } - } - }); - - readerThread.start(); - writerThread.start(); - - // Wait till reader is has finished receiving all the messages - // or he has stopped - // receiving messages. - Thread.sleep(1000); - int lastRead = totalRead.get(); - while (readerThread.isAlive()) { - readerThread.join(1000); - // No progress?? then stop waiting.. - if (lastRead == totalRead.get()) { - break; - } - lastRead = totalRead.get(); - } - - stopThreads.set(true); - - assertTrue("Should not have received a reader exception", readerException == null); - assertTrue("Should not have received a writer exception", writerException == null); - - assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get()); - - } - finally { - session.close(); - } - } - finally { - connection.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/491cca7d/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java deleted file mode 100644 index 1e2448a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.derby.jdbc.EmbeddedDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.*; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Implements the test case attached to: - * https://issues.apache.org/jira/browse/AMQ-4351 - * - * This version avoids the spring deps. - */ -public class AMQ4351Test extends BrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4351Test.class); - - public static Test suite() { - return suite(AMQ4351Test.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - - // Lets clean up often. - broker.setOfflineDurableSubscriberTaskSchedule(500); - broker.setOfflineDurableSubscriberTimeout(2000); // lets delete durable subs much faster. - - JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); - EmbeddedDataSource dataSource = new EmbeddedDataSource(); - dataSource.setDatabaseName("derbyDb"); - dataSource.setCreateDatabase("create"); - jdbc.setDataSource(dataSource); - - jdbc.deleteAllMessages(); - broker.setPersistenceAdapter(jdbc); - return broker; - } - - ActiveMQConnectionFactory connectionFactory; - ActiveMQTopic destination = new ActiveMQTopic("TEST"); - - @Override - protected void setUp() throws Exception { - super.setUp(); - connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); - } - - class ProducingClient implements Runnable { - - final AtomicLong size = new AtomicLong(); - final AtomicBoolean done = new AtomicBoolean(); - CountDownLatch doneLatch = new CountDownLatch(1); - - Connection connection; - Session session; - MessageProducer producer; - - ProducingClient() throws JMSException { - connection = connectionFactory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(destination); - } - - private void sendMessage() { - try { - producer.send(session.createTextMessage("Test")); - long i = size.incrementAndGet(); - if ((i % 1000) == 0) { - LOG.info("produced " + i + "."); - } - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - public void start() { - new Thread(this, "ProducingClient").start(); - } - - public void stop() throws InterruptedException { - done.set(true); - if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) { - try { - connection.close(); - doneLatch.await(); - } - catch (JMSException e) { - } - } - } - - @Override - public void run() { - try { - try { - while (!done.get()) { - sendMessage(); - Thread.sleep(10); - } - } - finally { - connection.close(); - } - } - catch (Exception e) { - e.printStackTrace(); - done.set(true); - } - finally { - doneLatch.countDown(); - } - } - } - - class ConsumingClient implements Runnable { - - final String name; - final AtomicLong size = new AtomicLong(); - final AtomicBoolean done = new AtomicBoolean(); - CountDownLatch doneLatch = new CountDownLatch(1); - CountDownLatch started; - CountDownLatch finished; - - public ConsumingClient(String name, CountDownLatch started, CountDownLatch finished) { - this.name = name; - this.started = started; - this.finished = finished; - } - - public void start() { - LOG.info("Starting JMS listener " + name); - new Thread(this, "ConsumingClient: " + name).start(); - } - - public void stopAsync() { - finished.countDown(); - done.set(true); - } - - public void stop() throws InterruptedException { - stopAsync(); - doneLatch.await(); - } - - @Override - public void run() { - try { - Connection connection = connectionFactory.createConnection(); - connection.setClientID(name); - connection.start(); - try { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createDurableSubscriber(destination, name, null, false); - started.countDown(); - while (!done.get()) { - Message msg = consumer.receive(100); - if (msg != null) { - size.incrementAndGet(); - session.commit(); - } - } - } - finally { - connection.close(); - LOG.info("Stopped JMS listener " + name); - } - } - catch (Exception e) { - e.printStackTrace(); - done.set(true); - } - finally { - doneLatch.countDown(); - } - } - - } - - public void testAMQ4351() throws InterruptedException, JMSException { - LOG.info("Start test."); - int subs = 100; - CountDownLatch startedLatch = new CountDownLatch(subs - 1); - CountDownLatch shutdownLatch = new CountDownLatch(subs - 4); - - ProducingClient producer = new ProducingClient(); - ConsumingClient listener1 = new ConsumingClient("subscriber-1", startedLatch, shutdownLatch); - ConsumingClient listener2 = new ConsumingClient("subscriber-2", startedLatch, shutdownLatch); - ConsumingClient listener3 = new ConsumingClient("subscriber-3", startedLatch, shutdownLatch); - try { - - listener1.start(); - listener2.start(); - listener3.start(); - - List subscribers = new ArrayList<>(subs); - for (int i = 4; i < subs; i++) { - ConsumingClient client = new ConsumingClient("subscriber-" + i, startedLatch, shutdownLatch); - subscribers.add(client); - client.start(); - } - startedLatch.await(10, TimeUnit.SECONDS); - - LOG.info("All subscribers started."); - producer.sendMessage(); - - LOG.info("Stopping 97 subscribers...."); - for (ConsumingClient client : subscribers) { - client.stopAsync(); - } - shutdownLatch.await(10, TimeUnit.SECONDS); - - // Start producing messages for 10 minutes, at high rate - LOG.info("Starting mass message producer..."); - producer.start(); - - long lastSize = listener1.size.get(); - for (int i = 0; i < 10; i++) { - Thread.sleep(1000); - long size = listener1.size.get(); - LOG.info("Listener 1: consumed: " + (size - lastSize)); - assertTrue(size > lastSize); - lastSize = size; - } - } - finally { - LOG.info("Stopping clients"); - listener1.stop(); - listener2.stop(); - listener3.stop(); - producer.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/491cca7d/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java deleted file mode 100644 index 6d0a70e..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java +++ /dev/null @@ -1,395 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.jmx; - -import static org.junit.Assert.*; - -import java.io.File; -import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Used to verify that the BrokerView accessed while the BrokerSerivce is waiting - * for a Slow Store startup to complete doesn't throw unexpected NullPointerExceptions. - */ -public class BrokerViewSlowStoreStartupTest { - - private static final Logger LOG = LoggerFactory.getLogger(BrokerViewSlowStoreStartupTest.class); - - private final CountDownLatch holdStoreStart = new CountDownLatch(1); - private final String brokerName = "brokerViewTest"; - - private BrokerService broker; - private Thread startThread; - - private BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - broker.setBrokerName(brokerName); - - KahaDBStore kaha = new KahaDBStore() { - - @Override - public void start() throws Exception { - LOG.info("Test KahaDB class is waiting for signal to complete its start()"); - holdStoreStart.await(); - super.start(); - LOG.info("Test KahaDB class is completed its start()"); - } - }; - - kaha.setDirectory(new File("target/activemq-data/kahadb")); - kaha.deleteAllMessages(); - - broker.setPersistenceAdapter(kaha); - broker.setUseJmx(true); - - return broker; - } - - @Before - public void setUp() throws Exception { - broker = createBroker(); - - startThread = new Thread(new Runnable() { - - @Override - public void run() { - try { - broker.start(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - startThread.start(); - } - - @After - public void tearDown() throws Exception { - - // ensure we don't keep the broker held if an exception occurs somewhere. - holdStoreStart.countDown(); - - startThread.join(); - - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test(timeout = 120000) - public void testBrokerViewOnSlowStoreStart() throws Exception { - - // Ensure we have an Admin View. - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return (broker.getAdminView()) != null; - } - })); - - final BrokerView view = broker.getAdminView(); - - try { - view.getBrokerName(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getBrokerId(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTotalEnqueueCount(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTotalDequeueCount(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTotalConsumerCount(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTotalProducerCount(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTotalMessageCount(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTotalMessagesCached(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.resetStatistics(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.enableStatistics(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.disableStatistics(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.isStatisticsEnabled(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTopics(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getQueues(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTemporaryTopics(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTemporaryQueues(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTopicSubscribers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getDurableTopicSubscribers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getQueueSubscribers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTemporaryTopicSubscribers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTemporaryQueueSubscribers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getInactiveDurableTopicSubscribers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTopicProducers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getQueueProducers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTemporaryTopicProducers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getTemporaryQueueProducers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.getDynamicDestinationProducers(); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.removeConnector("tcp"); - fail("Should have thrown a NoSuchElementException"); - } - catch (NoSuchElementException e) { - } - - try { - view.removeNetworkConnector("tcp"); - fail("Should have thrown a NoSuchElementException"); - } - catch (NoSuchElementException e) { - } - - try { - view.addTopic("TEST"); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.addQueue("TEST"); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.removeTopic("TEST"); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.removeQueue("TEST"); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.createDurableSubscriber("1", "2", "3", "4"); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - try { - view.destroyDurableSubscriber("1", "2"); - fail("Should have thrown an IllegalStateException"); - } - catch (IllegalStateException e) { - } - - holdStoreStart.countDown(); - startThread.join(); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return view.getBroker() != null; - } - }); - assertNotNull(view.getBroker()); - - try { - view.getBrokerName(); - } - catch (Exception e) { - fail("caught an exception getting the Broker property: " + e.getClass().getName()); - } - - try { - view.getBrokerId(); - } - catch (IllegalStateException e) { - fail("caught an exception getting the Broker property: " + e.getClass().getName()); - } - - try { - view.getTotalEnqueueCount(); - } - catch (IllegalStateException e) { - fail("caught an exception getting the Broker property: " + e.getClass().getName()); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/491cca7d/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java deleted file mode 100644 index 6406b85..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.jmx; - -import java.util.List; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HealthViewMBeanTest extends EmbeddedBrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class); - protected MBeanServer mbeanServer; - protected String domain = "org.apache.activemq"; - - @Override - protected void setUp() throws Exception { - bindAddress = "tcp://localhost:0"; - useTopic = false; - super.setUp(); - mbeanServer = broker.getManagementContext().getMBeanServer(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - @Override - protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setPersistent(true); - answer.setDeleteAllMessagesOnStartup(true); - answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 64); - answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 64); - answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 64); - answer.setUseJmx(true); - answer.setSchedulerSupport(true); - - // allow options to be visible via jmx - - answer.addConnector(bindAddress); - return answer; - } - - public void testHealthView() throws Exception { - Connection connection = connectionFactory.createConnection(); - - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - for (int i = 0; i < 60; i++) { - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[1024 * 1024]); - producer.send(message); - } - - Thread.sleep(1000); - - String objectNameStr = broker.getBrokerObjectName().toString(); - objectNameStr += ",service=Health"; - ObjectName brokerName = assertRegisteredObjectName(objectNameStr); - HealthViewMBean health = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, HealthViewMBean.class, true); - List list = health.healthList(); - - for (HealthStatus status : list) { - LOG.info("Health status: {}", status); - } - - assertEquals(2, list.size()); - } - - protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { - ObjectName objectName = new ObjectName(name); - if (mbeanServer.isRegistered(objectName)) { - LOG.info("Bean Registered: " + objectName); - } - else { - fail("Could not find MBean!: " + objectName); - } - return objectName; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/491cca7d/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java deleted file mode 100644 index 82f1c4e..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.jmx; - -import java.util.List; - -import javax.jms.ConnectionFactory; -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.Test; -import org.slf4j.LoggerFactory; - -public class Log4JConfigTest extends EmbeddedBrokerTestSupport { - - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Log4JConfigTest.class); - - private static final String BROKER_LOGGER = "org.apache.activemq.broker.BrokerService"; - - protected MBeanServer mbeanServer; - protected String domain = "org.apache.activemq"; - - @Override - protected void setUp() throws Exception { - bindAddress = "tcp://localhost:0"; - useTopic = false; - super.setUp(); - mbeanServer = broker.getManagementContext().getMBeanServer(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - @Override - protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setPersistent(true); - answer.setDeleteAllMessagesOnStartup(true); - answer.setUseJmx(true); - answer.setSchedulerSupport(true); - answer.addConnector(bindAddress); - return answer; - } - - @Test - public void testLog4JConfigViewExists() throws Exception { - String brokerObjectName = broker.getBrokerObjectName().toString(); - String log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName).toString(); - assertRegisteredObjectName(log4jConfigViewName); - } - - @Test - public void testLog4JConfigViewGetLoggers() throws Throwable { - String brokerObjectName = broker.getBrokerObjectName().toString(); - ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); - Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); - - List loggers = log4jConfigView.getLoggers(); - assertNotNull(loggers); - assertFalse(loggers.isEmpty()); - } - - @Test - public void testLog4JConfigViewGetLevel() throws Throwable { - String brokerObjectName = broker.getBrokerObjectName().toString(); - ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); - Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); - - String level = log4jConfigView.getLogLevel(BROKER_LOGGER); - assertNotNull(level); - assertFalse(level.isEmpty()); - } - - @Test - public void testLog4JConfigViewGetLevelUnknownLoggerName() throws Throwable { - String brokerObjectName = broker.getBrokerObjectName().toString(); - ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); - Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); - - // Non-existent loggers will return a name equal to the root level. - String level = log4jConfigView.getLogLevel("not.a.logger"); - assertNotNull(level); - assertFalse(level.isEmpty()); - assertEquals(Logger.getRootLogger().getLevel().toString(), level); - } - - @Test - public void testLog4JConfigViewSetLevel() throws Throwable { - String brokerObjectName = broker.getBrokerObjectName().toString(); - ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); - Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); - - String level = log4jConfigView.getLogLevel(BROKER_LOGGER); - assertNotNull(level); - assertFalse(level.isEmpty()); - - log4jConfigView.setLogLevel(BROKER_LOGGER, "WARN"); - level = log4jConfigView.getLogLevel(BROKER_LOGGER); - assertNotNull(level); - assertEquals("WARN", level); - - log4jConfigView.setLogLevel(BROKER_LOGGER, "INFO"); - level = log4jConfigView.getLogLevel(BROKER_LOGGER); - assertNotNull(level); - assertEquals("INFO", level); - } - - @Test - public void testLog4JConfigViewSetLevelNoChangeIfLevelIsBad() throws Throwable { - String brokerObjectName = broker.getBrokerObjectName().toString(); - ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); - Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); - - log4jConfigView.setLogLevel(BROKER_LOGGER, "INFO"); - String level = log4jConfigView.getLogLevel(BROKER_LOGGER); - assertNotNull(level); - assertEquals("INFO", level); - - log4jConfigView.setLogLevel(BROKER_LOGGER, "BAD"); - level = log4jConfigView.getLogLevel(BROKER_LOGGER); - assertNotNull(level); - assertEquals("INFO", level); - } - - @Test - public void testLog4JConfigViewGetRootLogLevel() throws Throwable { - String brokerObjectName = broker.getBrokerObjectName().toString(); - ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); - Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); - - String level = log4jConfigView.getRootLogLevel(); - assertNotNull(level); - assertFalse(level.isEmpty()); - - String currentRootLevel = Logger.getRootLogger().getLevel().toString(); - assertEquals(currentRootLevel, level); - } - - @Test - public void testLog4JConfigViewSetRootLevel() throws Throwable { - String brokerObjectName = broker.getBrokerObjectName().toString(); - ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); - Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); - - String currentRootLevel = Logger.getRootLogger().getLevel().toString(); - log4jConfigView.setRootLogLevel("WARN"); - currentRootLevel = Logger.getRootLogger().getLevel().toString(); - assertEquals("WARN", currentRootLevel); - log4jConfigView.setRootLogLevel("INFO"); - currentRootLevel = Logger.getRootLogger().getLevel().toString(); - assertEquals("INFO", currentRootLevel); - - Level level; - } - - protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { - ObjectName objectName = new ObjectName(name); - if (mbeanServer.isRegistered(objectName)) { - LOG.info("Bean Registered: " + objectName); - } - else { - fail("Could not find MBean!: " + objectName); - } - return objectName; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/491cca7d/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java deleted file mode 100644 index 5747efe..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.jmx; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class MBeanOperationTimeoutTest { - - private static final Logger LOG = LoggerFactory.getLogger(MBeanOperationTimeoutTest.class); - - private ActiveMQConnectionFactory connectionFactory; - private BrokerService broker; - private String connectionUri; - private static final String destinationName = "MBeanOperationTimeoutTestQ"; - private static final String moveToDestinationName = "MBeanOperationTimeoutTestQ.Moved"; - - protected MBeanServer mbeanServer; - protected String domain = "org.apache.activemq"; - - protected int messageCount = 50000; - - @Test(expected = TimeoutException.class) - public void testLongOperationTimesOut() throws Exception { - - sendMessages(messageCount); - LOG.info("Produced " + messageCount + " messages to the broker."); - - // Now get the QueueViewMBean and purge - String objectNameStr = broker.getBrokerObjectName().toString(); - objectNameStr += ",destinationType=Queue,destinationName=" + destinationName; - - ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr); - QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - long count = proxy.getQueueSize(); - assertEquals("Queue size", count, messageCount); - - LOG.info("Attempting to move one message, TimeoutException expected"); - proxy.moveMatchingMessagesTo(null, moveToDestinationName); - } - - private void sendMessages(int count) throws Exception { - Connection connection = connectionFactory.createConnection(); - try { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination = session.createQueue(destinationName); - MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < messageCount; i++) { - Message message = session.createMessage(); - message.setIntProperty("id", i); - producer.send(message); - } - session.commit(); - } - finally { - connection.close(); - } - } - - protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { - ObjectName objectName = new ObjectName(name); - if (mbeanServer.isRegistered(objectName)) { - LOG.info("Bean Registered: " + objectName); - } - else { - fail("Could not find MBean!: " + objectName); - } - return objectName; - } - - @Before - public void setUp() throws Exception { - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - mbeanServer = broker.getManagementContext().getMBeanServer(); - } - - @After - public void tearDown() throws Exception { - Thread.sleep(500); - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setMbeanInvocationTimeout(TimeUnit.SECONDS.toMillis(1)); - answer.setUseJmx(true); - answer.addConnector("vm://localhost"); - answer.setDeleteAllMessagesOnStartup(true); - return answer; - } -}