activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [48/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:19 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
new file mode 100644
index 0000000..4b89851
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.management.JMSConnectionStatsImpl;
+import org.apache.activemq.transport.failover.FailoverTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionFactoryTest.class);
+    long txGenerator = System.currentTimeMillis();
+    private ActiveMQConnection connection;
+    private BrokerService broker;
+
+    @Override
+    public void tearDown() throws Exception {
+        // Try our best to close any previously opend connection.
+        try {
+            connection.close();
+        } catch (Throwable ignore) {
+        }
+        // Try our best to stop any previously started broker.
+        try {
+            broker.stop();
+        } catch (Throwable ignore) {
+        }
+    }
+
+    public void testCopy() throws URISyntaxException, JMSException {
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?");
+        ActiveMQConnectionFactory copy = cf.copy();
+        assertTrue("Should be an ActiveMQXAConnectionFactory", copy instanceof ActiveMQXAConnectionFactory);
+    }
+
+    public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(
+                                                                         "vm://localhost?jms.useAsyncSend=true");
+        assertTrue(cf.isUseAsyncSend());
+        // the broker url have been adjusted.
+        assertEquals("vm://localhost", cf.getBrokerURL());
+
+        cf = new ActiveMQXAConnectionFactory("vm://localhost?jms.useAsyncSend=false");
+        assertFalse(cf.isUseAsyncSend());
+        // the broker url have been adjusted.
+        assertEquals("vm://localhost", cf.getBrokerURL());
+
+        cf = new ActiveMQXAConnectionFactory("vm:(broker:()/localhost)?jms.useAsyncSend=true");
+        assertTrue(cf.isUseAsyncSend());
+        // the broker url have been adjusted.
+        assertEquals("vm:(broker:()/localhost)", cf.getBrokerURL());
+
+        cf = new ActiveMQXAConnectionFactory(
+                "vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=10&" +
+                               "jms.redeliveryPolicy.initialRedeliveryDelay=10000&" +
+                               "jms.redeliveryPolicy.redeliveryDelay=10000&" +
+                               "jms.redeliveryPolicy.useExponentialBackOff=true&" +
+                               "jms.redeliveryPolicy.backOffMultiplier=2");
+        assertEquals(10, cf.getRedeliveryPolicy().getMaximumRedeliveries());
+        assertEquals(10000, cf.getRedeliveryPolicy().getInitialRedeliveryDelay());
+        assertEquals(10000, cf.getRedeliveryPolicy().getRedeliveryDelay());
+        assertEquals(true, cf.getRedeliveryPolicy().isUseExponentialBackOff());
+        assertEquals(2.0, cf.getRedeliveryPolicy().getBackOffMultiplier(), 0.1);
+
+        // the broker url have been adjusted.
+        assertEquals("vm://localhost", cf.getBrokerURL());
+    }
+
+    public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException {
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://myBroker?broker.persistent=false");
+        // Make sure the broker is not created until the connection is
+        // instantiated.
+        assertNull(BrokerRegistry.getInstance().lookup("myBroker"));
+        connection = (ActiveMQConnection) cf.createConnection();
+        // This should create the connection.
+        assertNotNull(connection);
+        // Verify the broker was created.
+        assertNotNull(BrokerRegistry.getInstance().lookup("myBroker"));
+        connection.close();
+        // Verify the broker was destroyed.
+        assertNull(BrokerRegistry.getInstance().lookup("myBroker"));
+
+        connection.close();
+    }
+
+    public void testGetBrokerName() throws URISyntaxException, JMSException {
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+        connection = (ActiveMQConnection)cf.createConnection();
+        connection.start();
+
+        String brokerName = connection.getBrokerName();
+        LOG.info("Got broker name: " + brokerName);
+
+        assertNotNull("No broker name available!", brokerName);
+        connection.close();
+    }
+
+    public void testCreateTcpConnectionUsingAllocatedPort() throws Exception {
+        assertCreateConnection("tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true");
+    }
+
+    public void testCreateTcpConnectionUsingKnownPort() throws Exception {
+        assertCreateConnection("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true");
+    }
+
+    public void testIsSameRM() throws URISyntaxException, JMSException, XAException {
+
+        XAConnection connection1 = null;
+        XAConnection connection2 = null;
+        try {
+            ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+            connection1 = (XAConnection)cf1.createConnection();
+            XASession session1 = connection1.createXASession();
+            XAResource resource1 = session1.getXAResource();
+
+            ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+            connection2 = (XAConnection)cf2.createConnection();
+            XASession session2 = connection2.createXASession();
+            XAResource resource2 = session2.getXAResource();
+
+            assertTrue(resource1.isSameRM(resource2));
+            session1.close();
+            session2.close();
+        } finally {
+            if (connection1 != null) {
+                try {
+                    connection1.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+            if (connection2 != null) {
+                try {
+                    connection2.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException {
+
+        XAConnection connection1 = null;
+        XAConnection connection2 = null;
+        try {
+            ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true");
+            connection1 = (XAConnection)cf1.createConnection();
+            XASession session1 = connection1.createXASession();
+            XAResource resource1 = session1.getXAResource();
+
+            ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+            connection2 = (XAConnection)cf2.createConnection();
+            XASession session2 = connection2.createXASession();
+            XAResource resource2 = session2.getXAResource();
+
+            assertFalse(resource1.isSameRM(resource2));
+
+            // ensure identity is preserved
+            XASession session1a = connection1.createXASession();
+            assertTrue(resource1.isSameRM(session1a.getXAResource()));
+            session1.close();
+            session2.close();
+        } finally {
+            if (connection1 != null) {
+                try {
+                    connection1.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+            if (connection2 != null) {
+                try {
+                    connection2.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    public void testVanilaTransactionalProduceReceive() throws Exception {
+
+        XAConnection connection1 = null;
+        try {
+            ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+            connection1 = (XAConnection)cf1.createConnection();
+            connection1.start();
+            XASession session = connection1.createXASession();
+            XAResource resource = session.getXAResource();
+            Destination dest = new ActiveMQQueue(getName());
+
+            // publish a message
+            Xid tid = createXid();
+            resource.start(tid, XAResource.TMNOFLAGS);
+            MessageProducer producer = session.createProducer(dest);
+            ActiveMQTextMessage message  = new ActiveMQTextMessage();
+            message.setText(getName());
+            producer.send(message);
+            resource.end(tid, XAResource.TMSUCCESS);
+            resource.commit(tid, true);
+            session.close();
+
+            session = connection1.createXASession();
+            MessageConsumer consumer = session.createConsumer(dest);
+            tid = createXid();
+            resource = session.getXAResource();
+            resource.start(tid, XAResource.TMNOFLAGS);
+            TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+            assertNotNull(receivedMessage);
+            assertEquals(getName(), receivedMessage.getText());
+            resource.end(tid, XAResource.TMSUCCESS);
+            resource.commit(tid, true);
+            session.close();
+
+        } finally {
+            if (connection1 != null) {
+                try {
+                    connection1.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    public void testConsumerCloseTransactionalSendReceive() throws Exception {
+
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+        XAConnection connection1 = (XAConnection)cf1.createConnection();
+        connection1.start();
+        XASession session = connection1.createXASession();
+        XAResource resource = session.getXAResource();
+        Destination dest = new ActiveMQQueue(getName());
+
+        // publish a message
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        MessageProducer producer = session.createProducer(dest);
+        ActiveMQTextMessage message  = new ActiveMQTextMessage();
+        message.setText(getName());
+        producer.send(message);
+        producer.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        session.close();
+
+        session = connection1.createXASession();
+        MessageConsumer consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+        consumer.close();
+        assertNotNull(receivedMessage);
+        assertEquals(getName(), receivedMessage.getText());
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+
+        session = connection1.createXASession();
+        consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        assertNull(consumer.receive(1000));
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+
+    }
+
+    public void testSessionCloseTransactionalSendReceive() throws Exception {
+
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+        XAConnection connection1 = (XAConnection)cf1.createConnection();
+        connection1.start();
+        XASession session = connection1.createXASession();
+        XAResource resource = session.getXAResource();
+        Destination dest = new ActiveMQQueue(getName());
+
+        // publish a message
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        MessageProducer producer = session.createProducer(dest);
+        ActiveMQTextMessage message  = new ActiveMQTextMessage();
+        message.setText(getName());
+        producer.send(message);
+        session.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+
+
+        session = connection1.createXASession();
+        MessageConsumer consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+        session.close();
+        assertNotNull(receivedMessage);
+        assertEquals(getName(), receivedMessage.getText());
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+
+        session = connection1.createXASession();
+        consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        assertNull(consumer.receive(1000));
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+    }
+
+
+    public void testReadonlyNoLeak() throws Exception {
+        final String brokerName = "readOnlyNoLeak";
+        BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
+        broker.setPersistent(false);
+        broker.start();
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
+        cf1.setStatsEnabled(true);
+        ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf1.createConnection();
+        xaConnection.start();
+        XASession session = xaConnection.createXASession();
+        XAResource resource = session.getXAResource();
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        session.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+
+        assertTransactionGoneFromBroker(tid);
+        assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
+        assertSessionGone(xaConnection, session);
+        assertTransactionGoneFromFailoverState(xaConnection, tid);
+
+        // two phase
+        session = xaConnection.createXASession();
+        resource = session.getXAResource();
+        tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        session.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        assertEquals(XAResource.XA_RDONLY, resource.prepare(tid));
+
+        // no need for a commit on read only
+        assertTransactionGoneFromBroker(tid);
+        assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
+        assertSessionGone(xaConnection, session);
+        assertTransactionGoneFromFailoverState(xaConnection, tid);
+
+        xaConnection.close();
+        broker.stop();
+
+    }
+
+    public void testCloseSendConnection() throws Exception {
+        String brokerName = "closeSend";
+        BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
+        broker.start();
+        broker.waitUntilStarted();
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+        XAConnection connection = (XAConnection)cf.createConnection();
+        connection.start();
+        XASession session = connection.createXASession();
+        XAResource resource = session.getXAResource();
+        Destination dest = new ActiveMQQueue(getName());
+
+        // publish a message
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        MessageProducer producer = session.createProducer(dest);
+        ActiveMQTextMessage message  = new ActiveMQTextMessage();
+        message.setText(getName());
+        producer.send(message);
+
+        connection.close();
+
+        assertTransactionGoneFromBroker(tid);
+
+        broker.stop();
+    }
+
+    public void testExceptionAfterClose() throws Exception {
+
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+        XAConnection connection1 = (XAConnection)cf1.createConnection();
+        connection1.start();
+
+        XASession session = connection1.createXASession();
+        session.close();
+        try {
+            session.commit();
+            fail("expect exception after close");
+        } catch (javax.jms.IllegalStateException expected) {}
+
+        try {
+            session.rollback();
+            fail("expect exception after close");
+        } catch (javax.jms.IllegalStateException expected) {}
+
+        try {
+            session.getTransacted();
+            fail("expect exception after close");
+        } catch (javax.jms.IllegalStateException expected) {}
+    }
+
+    public void testRollbackXaErrorCode() throws Exception {
+        String brokerName = "rollbackErrorCode";
+        BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
+        broker.start();
+        broker.waitUntilStarted();
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+        XAConnection connection = (XAConnection)cf.createConnection();
+        connection.start();
+        XASession session = connection.createXASession();
+        XAResource resource = session.getXAResource();
+
+        Xid tid = createXid();
+        try {
+            resource.rollback(tid);
+            fail("Expected xa exception on no tx");
+        } catch (XAException expected) {
+            LOG.info("got expected xa", expected);
+            assertEquals("no tx", XAException.XAER_NOTA, expected.errorCode);
+        }
+        connection.close();
+        broker.stop();
+    }
+
+    private void assertTransactionGoneFromFailoverState(
+            ActiveMQXAConnection connection1, Xid tid) throws Exception {
+
+        FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class);
+        TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
+        assertNull("transaction should not exist in the state tracker",
+                transport.getStateTracker().processCommitTransactionOnePhase(info));
+    }
+
+    private void assertSessionGone(ActiveMQXAConnection connection1,
+            XASession session) {
+        JMSConnectionStatsImpl stats = (JMSConnectionStatsImpl)connection1.getStats();
+        // should be no dangling sessions maintained by the transaction
+        assertEquals("should be no sessions", 0, stats.getSessions().length);
+    }
+
+    private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
+        CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections();
+        for (TransportConnection connection: connections) {
+            if (connection.getConnectionId().equals(clientId)) {
+                try {
+                    connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
+                    fail("did not get expected excepton on missing transaction, it must be still there in error!");
+                } catch (IllegalStateException expectedOnNoTransaction) {
+                }
+            }
+        }
+    }
+
+    private void assertTransactionGoneFromBroker(Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
+        TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+        try {
+            transactionBroker.getTransaction(null, new XATransactionId(tid), false);
+            fail("expected exception on tx not found");
+        } catch (XAException expectedOnNotFound) {
+        }
+    }
+
+    protected void assertCreateConnection(String uri) throws Exception {
+        // Start up a broker with a tcp connector.
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        TransportConnector connector = broker.addConnector(uri);
+        broker.start();
+
+        URI temp = new URI(uri);
+        // URI connectURI = connector.getServer().getConnectURI();
+        // TODO this sometimes fails when using the actual local host name
+        URI currentURI = new URI(connector.getPublishableConnectString());
+
+        // sometimes the actual host name doesn't work in this test case
+        // e.g. on OS X so lets use the original details but just use the actual
+        // port
+        URI connectURI = new URI(temp.getScheme(), temp.getUserInfo(), temp.getHost(), currentURI.getPort(),
+                                 temp.getPath(), temp.getQuery(), temp.getFragment());
+
+        LOG.info("connection URI is: " + connectURI);
+
+        // This should create the connection.
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(connectURI);
+        Connection connection = cf.createConnection();
+
+        assertXAConnection(connection);
+
+        assertNotNull(connection);
+        connection.close();
+
+        connection = cf.createXAConnection();
+
+        assertXAConnection(connection);
+
+        assertNotNull(connection);
+    }
+
+    private void assertXAConnection(Connection connection) {
+        assertTrue("Should be an XAConnection", connection instanceof XAConnection);
+        assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection);
+        assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection);
+    }
+
+    public Xid createXid() throws IOException {
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java
new file mode 100644
index 0000000..eafe359
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.transport.TransportFactory;
+
+public class ClientTestSupport extends TestCase {
+
+    protected BrokerService broker;
+    protected long idGenerator;
+
+    private ActiveMQConnectionFactory connFactory;
+    private final String brokerURL = "vm://localhost?broker.persistent=false";
+
+    @Override
+    public void setUp() throws Exception {
+        final AtomicBoolean connected = new AtomicBoolean(false);
+        TransportConnector connector;
+
+        // Start up a broker with a tcp connector.
+        try {
+            broker = BrokerFactory.createBroker(new URI(this.brokerURL));
+            broker.getBrokerName();
+            connector = new TransportConnector(TransportFactory.bind(new URI(this.brokerURL))) {
+                // Hook into the connector so we can assert that the server
+                // accepted a connection.
+                @Override
+                protected org.apache.activemq.broker.Connection createConnection(org.apache.activemq.transport.Transport transport) throws IOException {
+                    connected.set(true);
+                    return super.createConnection(transport);
+                }
+            };
+            broker.addConnector(connector);
+            broker.start();
+
+        } catch (IOException e) {
+            throw new JMSException("Error creating broker " + e);
+        } catch (URISyntaxException e) {
+            throw new JMSException("Error creating broker " + e);
+        }
+
+        URI connectURI;
+        connectURI = connector.getServer().getConnectURI();
+
+        // This should create the connection.
+        connFactory = new ActiveMQConnectionFactory(connectURI);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    public ActiveMQConnectionFactory getConnectionFactory() throws JMSException {
+        if (this.connFactory == null) {
+            throw new JMSException("ActiveMQConnectionFactory is null ");
+        }
+        return this.connFactory;
+    }
+
+    // Helper Classes
+    protected ConnectionInfo createConnectionInfo() throws Exception {
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+        info.setClientId(info.getConnectionId().getValue());
+        return info;
+    }
+
+    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+        return info;
+    }
+
+    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
+        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+        info.setBrowser(false);
+        info.setDestination(destination);
+        info.setPrefetchSize(1000);
+        info.setDispatchAsync(false);
+        return info;
+    }
+
+    protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+        return consumerInfo.createRemoveCommand();
+    }
+
+    protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(ackType);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setDestination(msg.getDestination());
+        ack.setLastMessageId(msg.getMessageId());
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+    protected Message receiveMessage(StubConnection connection, int maxWait) throws InterruptedException {
+        while (true) {
+            Object o = connection.getDispatchQueue().poll(maxWait, TimeUnit.MILLISECONDS);
+
+            if (o == null) {
+                return null;
+            }
+
+            if (o instanceof MessageDispatch) {
+                MessageDispatch dispatch = (MessageDispatch)o;
+                return dispatch.getMessage();
+            }
+        }
+    }
+
+    protected Broker getBroker() throws Exception {
+        return this.broker != null ? this.broker.getBroker() : null;
+    }
+
+    public static void removeMessageStore() {
+        if (System.getProperty("activemq.store.dir") != null) {
+            recursiveDelete(new File(System.getProperty("activemq.store.dir")));
+        }
+        if (System.getProperty("derby.system.home") != null) {
+            recursiveDelete(new File(System.getProperty("derby.system.home")));
+        }
+    }
+
+    public static void recursiveDelete(File f) {
+        if (f.isDirectory()) {
+            File[] files = f.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                recursiveDelete(files[i]);
+            }
+        }
+        f.delete();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java
new file mode 100644
index 0000000..a11505c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.security.ProtectionDomain;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Poor mans way of getting JUnit to run a test case through a few different
+ * combinations of options. Usage: If you have a test case called testFoo what
+ * you want to run through a few combinations, of of values for the attributes
+ * age and color, you would something like: <code>
+ *    public void initCombosForTestFoo() {
+ *        addCombinationValues( "age", new Object[]{ new Integer(21), new Integer(30) } );
+ *        addCombinationValues( "color", new Object[]{"blue", "green"} );
+ *    }
+ * </code>
+ * The testFoo test case would be run for each possible combination of age and
+ * color that you setup in the initCombosForTestFoo method. Before each
+ * combination is run, the age and color fields of the test class are set to one
+ * of the values defined. This is done before the normal setUp method is called.
+ * If you want the test combinations to show up as separate test runs in the
+ * JUnit reports, add a suite method to your test case similar to: <code>
+ *     public static Test suite() {
+ *         return suite(FooTest.class);
+ *     }
+ * </code>
+ *
+ *
+ */
+public abstract class CombinationTestSupport extends AutoFailTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CombinationTestSupport.class);
+
+    private final HashMap<String, ComboOption> comboOptions = new HashMap<String, ComboOption>();
+    private boolean combosEvaluated;
+    private Map<String, Object> options;
+    protected File basedir;
+
+    static protected File basedir(Class<?> clazz) {
+        try {
+            ProtectionDomain protectionDomain = clazz.getProtectionDomain();
+            return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
+        } catch (IOException e) {
+            return new File(".");
+        }
+    }
+
+    static class ComboOption {
+        final String attribute;
+        final LinkedHashSet<Object> values = new LinkedHashSet<Object>();
+
+        public ComboOption(String attribute, Collection<Object> options) {
+            this.attribute = attribute;
+            this.values.addAll(options);
+        }
+    }
+
+    public CombinationTestSupport() {
+        basedir = basedir(getClass());
+    }
+    public void addCombinationValues(String attribute, Object[] options) {
+        ComboOption co = this.comboOptions.get(attribute);
+        if (co == null) {
+            this.comboOptions.put(attribute, new ComboOption(attribute, Arrays.asList(options)));
+        } else {
+            co.values.addAll(Arrays.asList(options));
+        }
+    }
+
+    @Override
+    public void runBare() throws Throwable {
+        if (combosEvaluated) {
+            super.runBare();
+        } else {
+            CombinationTestSupport[] combinations = getCombinations();
+            for (int i = 0; i < combinations.length; i++) {
+                CombinationTestSupport test = combinations[i];
+                if (getName() == null || getName().equals(test.getName())) {
+                    test.runBare();
+                }
+            }
+        }
+    }
+
+    private void setOptions(Map<String, Object> options) throws NoSuchFieldException, IllegalAccessException {
+        this.options = options;
+        for (Iterator<String> iterator = options.keySet().iterator(); iterator.hasNext();) {
+            String attribute = iterator.next();
+            Object value = options.get(attribute);
+            try {
+                Field field = getClass().getField(attribute);
+                field.set(this, value);
+            } catch (Throwable e) {
+                try {
+                    boolean found = false;
+                    String setterName = "set" + attribute.substring(0, 1).toUpperCase() +
+                                        attribute.substring(1);
+                    for(Method method : getClass().getMethods()) {
+                        if (method.getName().equals(setterName)) {
+                            method.invoke(this, value);
+                            found = true;
+                            break;
+                        }
+                    }
+
+                    if (!found) {
+                        throw new NoSuchMethodError("No setter found for field: " + attribute);
+                    }
+
+                } catch(Throwable ex) {
+                    LOG.info("Could not set field '" + attribute + "' to value '" + value +
+                             "', make sure the field exists and is public or has a setter.");
+                }
+            }
+        }
+    }
+
+    private CombinationTestSupport[] getCombinations() {
+        try {
+            Method method = getClass().getMethod("initCombos", (Class[])null);
+            method.invoke(this, (Object[])null);
+        } catch (Throwable e) {
+        }
+
+        String name = getName().split(" ")[0];
+        String comboSetupMethodName = "initCombosFor" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
+        try {
+            Method method = getClass().getMethod(comboSetupMethodName, (Class[])null);
+            method.invoke(this, (Object[])null);
+        } catch (Throwable e) {
+        }
+
+        try {
+            ArrayList<HashMap<String, Object>> expandedOptions = new ArrayList<HashMap<String, Object>>();
+            expandCombinations(new ArrayList<ComboOption>(comboOptions.values()), expandedOptions);
+
+            if (expandedOptions.isEmpty()) {
+                combosEvaluated = true;
+                return new CombinationTestSupport[] {this};
+            } else {
+
+                ArrayList<CombinationTestSupport> result = new ArrayList<CombinationTestSupport>();
+                // Run the test case for each possible combination
+                for (Iterator<HashMap<String, Object>> iter = expandedOptions.iterator(); iter.hasNext();) {
+                    CombinationTestSupport combo = (CombinationTestSupport)TestSuite.createTest(getClass(), name);
+                    combo.combosEvaluated = true;
+                    combo.setOptions(iter.next());
+                    result.add(combo);
+                }
+
+                CombinationTestSupport rc[] = new CombinationTestSupport[result.size()];
+                result.toArray(rc);
+                return rc;
+            }
+        } catch (Throwable e) {
+            combosEvaluated = true;
+            return new CombinationTestSupport[] {this};
+        }
+
+    }
+
+    private void expandCombinations(List<ComboOption> optionsLeft, List<HashMap<String, Object>> expandedCombos) {
+        if (!optionsLeft.isEmpty()) {
+            HashMap<String, Object> map;
+            if (comboOptions.size() == optionsLeft.size()) {
+                map = new HashMap<String, Object>();
+                expandedCombos.add(map);
+            } else {
+                map = expandedCombos.get(expandedCombos.size() - 1);
+            }
+
+            LinkedList<ComboOption> l = new LinkedList<ComboOption>(optionsLeft);
+            ComboOption comboOption = l.removeLast();
+            int i = 0;
+            if (comboOption.values.isEmpty() && !l.isEmpty()) {
+                expandCombinations(l, expandedCombos);
+            } else {
+                for (Iterator<Object> iter = comboOption.values.iterator(); iter.hasNext();) {
+                    Object value = iter.next();
+                    if (i != 0) {
+                        map = new HashMap<String, Object>(map);
+                        expandedCombos.add(map);
+                    }
+                    map.put(comboOption.attribute, value);
+                    expandCombinations(l, expandedCombos);
+                    i++;
+                }
+            }
+        }
+    }
+
+    public static Test suite(Class<? extends CombinationTestSupport> clazz) {
+        TestSuite suite = new TestSuite();
+
+        ArrayList<String> names = new ArrayList<String>();
+        Method[] methods = clazz.getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            String name = methods[i].getName();
+            if (names.contains(name) || !isPublicTestMethod(methods[i])) {
+                continue;
+            }
+            names.add(name);
+            Test test = TestSuite.createTest(clazz, name);
+            if (test instanceof CombinationTestSupport) {
+                CombinationTestSupport[] combinations = ((CombinationTestSupport)test).getCombinations();
+                for (int j = 0; j < combinations.length; j++) {
+                    suite.addTest(combinations[j]);
+                }
+            } else {
+                suite.addTest(test);
+            }
+        }
+        return suite;
+    }
+
+    private static boolean isPublicTestMethod(Method m) {
+        return isTestMethod(m) && Modifier.isPublic(m.getModifiers());
+    }
+
+    private static boolean isTestMethod(Method m) {
+        String name = m.getName();
+        Class<?>[] parameters = m.getParameterTypes();
+        Class<?> returnType = m.getReturnType();
+        return parameters.length == 0 && name.startsWith("test") && returnType.equals(Void.TYPE);
+    }
+
+    @Override
+    public String getName() {
+        return getName(false);
+    }
+
+    public String getName(boolean original) {
+        if (options != null && !original) {
+            return super.getName() + " " + options;
+        }
+        return super.getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
new file mode 100644
index 0000000..f7a64ac
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+/**
+ * 
+ */
+public class ConnectionCleanupTest extends TestCase {
+
+    private ActiveMQConnection connection;
+
+    protected void setUp() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        connection = (ActiveMQConnection)factory.createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        connection.close();
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void testChangeClientID() throws JMSException {
+
+        connection.setClientID("test");
+        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        try {
+            connection.setClientID("test");
+            // fail("Should have received JMSException");
+        } catch (JMSException e) {
+        }
+
+        connection.cleanup();
+        connection.setClientID("test");
+
+        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        try {
+            connection.setClientID("test");
+            // fail("Should have received JMSException");
+        } catch (JMSException e) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java
new file mode 100644
index 0000000..ae15671
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+/**
+ * 
+ */
+public class ConnectionCloseMultipleTimesConcurrentTest extends TestCase {
+
+    private ActiveMQConnection connection;
+    private ExecutorService executor;
+    private int size = 200;
+
+    protected void setUp() throws Exception {
+        executor = Executors.newFixedThreadPool(20);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        connection = (ActiveMQConnection)factory.createConnection();
+        connection.start();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection.isStarted()) {
+            connection.stop();
+        }
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+
+    /**
+     * @throws javax.jms.JMSException
+     */
+    public void testCloseMultipleTimes() throws Exception {
+        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        assertTrue(connection.isStarted());
+        assertFalse(connection.isClosed());
+
+        final CountDownLatch latch = new CountDownLatch(size);
+
+        for (int i = 0; i < size; i++) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        connection.close();
+
+                        assertFalse(connection.isStarted());
+                        assertTrue(connection.isClosed());
+
+                        latch.countDown();
+                    } catch (JMSException e) {
+                        // ignore
+                    }
+                }
+            });
+        }
+
+        boolean zero = latch.await(20, TimeUnit.SECONDS);
+        assertTrue("Should complete all", zero);
+
+        // should not fail calling again
+        connection.close();
+
+        assertFalse(connection.isStarted());
+        assertTrue(connection.isClosed());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java
new file mode 100644
index 0000000..3e78d73
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+/**
+ * 
+ */
+public class ConnectionCloseMultipleTimesTest extends TestCase {
+
+    private ActiveMQConnection connection;
+
+    protected void setUp() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        connection = (ActiveMQConnection)factory.createConnection();
+        connection.start();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection.isStarted()) {
+            connection.stop();
+        }
+    }
+
+    /**
+     * @throws javax.jms.JMSException
+     */
+    public void testCloseMultipleTimes() throws JMSException {
+        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        assertTrue(connection.isStarted());
+        assertFalse(connection.isClosed());
+
+        connection.close();
+
+        assertFalse(connection.isStarted());
+        assertTrue(connection.isClosed());
+
+        // should not fail calling again
+        connection.close();
+
+        assertFalse(connection.isStarted());
+        assertTrue(connection.isClosed());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
new file mode 100644
index 0000000..b34fe7f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * 
+ */
+public class ConsumerReceiveWithTimeoutTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Test to check if consumer thread wakes up inside a receive(timeout) after
+     * a message is dispatched to the consumer
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
+
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue queue = session.createQueue("test");
+
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    // wait for 10 seconds to allow consumer.receive to be run
+                    // first
+                    Thread.sleep(10000);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.send(session.createTextMessage("Hello"));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        t.start();
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(60000);
+        assertNotNull(msg);
+        session.close();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
new file mode 100644
index 0000000..773e871
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class CreateConsumerButDontStartConnectionWarningTest extends JmsQueueSendReceiveTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(CreateConsumerButDontStartConnectionWarningTest.class);
+
+    @Override
+    protected void startConnection() throws JMSException {
+        // don't start the connection
+    }
+
+    @Override
+    protected void assertMessagesAreReceived() throws JMSException {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            LOG.warn("Caught: " + e, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java
new file mode 100644
index 0000000..aa39cc1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+
+/**
+ * A base class for a test case which creates an embedded broker and uses a connection and session
+ *
+ * 
+ */
+public abstract class EmbeddedBrokerAndConnectionTestSupport extends EmbeddedBrokerTestSupport {
+    protected Connection connection;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
new file mode 100644
index 0000000..b049e96
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.springframework.jms.core.JmsTemplate;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+/**
+ * A useful base class which creates and closes an embedded broker
+ *
+ *
+ */
+public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
+
+    protected BrokerService broker;
+    // protected String bindAddress = "tcp://localhost:61616";
+    protected String bindAddress = "vm://localhost";
+    protected ConnectionFactory connectionFactory;
+    protected boolean useTopic;
+    protected ActiveMQDestination destination;
+    protected JmsTemplate template;
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        startBroker();
+
+        connectionFactory = createConnectionFactory();
+
+        destination = createDestination();
+
+        template = createJmsTemplate();
+        template.setDefaultDestination(destination);
+        template.setPubSubDomain(useTopic);
+        template.afterPropertiesSet();
+    }
+
+    protected void tearDown() throws Exception {
+        if (broker != null) {
+            try {
+                broker.stop();
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    /**
+     * Factory method to create a new {@link JmsTemplate}
+     *
+     * @return a newly created JmsTemplate
+     */
+    protected JmsTemplate createJmsTemplate() {
+        return new JmsTemplate(connectionFactory);
+    }
+
+    /**
+     * Factory method to create a new {@link Destination}
+     *
+     * @return newly created Destinaiton
+     */
+    protected ActiveMQDestination createDestination() {
+        return createDestination(getDestinationString());
+    }
+
+    /**
+     * Factory method to create the destination in either the queue or topic
+     * space based on the value of the {@link #useTopic} field
+     */
+    protected ActiveMQDestination createDestination(String subject) {
+        if (useTopic) {
+            return new ActiveMQTopic(subject);
+        } else {
+            return new ActiveMQQueue(subject);
+        }
+    }
+
+    /**
+     * Returns the name of the destination used in this test case
+     */
+    protected String getDestinationString() {
+        return getClass().getName() + "." + getName();
+    }
+
+    /**
+     * Factory method to create a new {@link ConnectionFactory} instance
+     *
+     * @return a newly created connection factory
+     */
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(bindAddress);
+    }
+
+    /**
+     * Factory method to create a new broker
+     *
+     * @throws Exception
+     */
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(isPersistent());
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+    protected void startBroker() throws Exception {
+        broker.start();
+    }
+
+    /**
+     * @return whether or not persistence should be used
+     */
+    protected boolean isPersistent() {
+        return false;
+    }
+
+    /**
+     * Factory method to create a new connection
+     */
+    protected Connection createConnection() throws Exception {
+        return connectionFactory.createConnection();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java
new file mode 100644
index 0000000..7aa22e3
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ExclusiveConsumerStartupDestinationTest extends EmbeddedBrokerTestSupport{
+
+    private static final String VM_BROKER_URL = "vm://localhost";
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        PolicyMap map = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setAllConsumersExclusiveByDefault(true);
+        map.setDefaultEntry(entry);
+        answer.setDestinationPolicy(map);
+        return answer;
+    }
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/exclusive-consumer-startup-destination.xml";
+    }
+
+    private Connection createConnection(final boolean start) throws JMSException {
+        ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
+        Connection conn = cf.createConnection();
+        if (start) {
+            conn.start();
+        }
+        return conn;
+    }
+
+    public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
+        Connection conn = createConnection(true);
+
+        Session exclusiveSession = null;
+        Session fallbackSession = null;
+        Session senderSession = null;
+
+        try {
+
+            exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1");
+            MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+            ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
+            MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+            ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
+
+            MessageProducer producer = senderSession.createProducer(senderQueue);
+
+            Message msg = senderSession.createTextMessage("test");
+            producer.send(msg);
+            // TODO need two send a 2nd message - bug AMQ-1024
+            // producer.send(msg);
+            Thread.sleep(100);
+
+            // Verify exclusive consumer receives the message.
+            assertNotNull(exclusiveConsumer.receive(100));
+            assertNull(fallbackConsumer.receive(100));
+        } finally {
+            fallbackSession.close();
+            senderSession.close();
+            conn.close();
+        }
+    }
+
+    public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException,
+        InterruptedException {
+        Connection conn = createConnection(true);
+
+        Session exclusiveSession1 = null;
+        Session exclusiveSession2 = null;
+        Session fallbackSession = null;
+        Session senderSession = null;
+
+        try {
+
+            exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // This creates the exclusive consumer first which avoids AMQ-1024
+            // bug.
+            ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2");
+            MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
+            MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
+
+            ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
+            MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+            ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
+
+            MessageProducer producer = senderSession.createProducer(senderQueue);
+
+            Message msg = senderSession.createTextMessage("test");
+            producer.send(msg);
+            Thread.sleep(100);
+
+            // Verify exclusive consumer receives the message.
+            assertNotNull(exclusiveConsumer1.receive(100));
+            assertNull(exclusiveConsumer2.receive(100));
+            assertNull(fallbackConsumer.receive(100));
+
+            // Close the exclusive consumer to verify the non-exclusive consumer
+            // takes over
+            exclusiveConsumer1.close();
+
+            producer.send(msg);
+            producer.send(msg);
+
+            assertNotNull("Should have received a message", exclusiveConsumer2.receive(100));
+            assertNull("Should not have received a message", fallbackConsumer.receive(100));
+
+        } finally {
+            fallbackSession.close();
+            senderSession.close();
+            conn.close();
+        }
+
+    }
+
+    public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
+        Connection conn = createConnection(true);
+
+        Session exclusiveSession = null;
+        Session fallbackSession = null;
+        Session senderSession = null;
+
+        try {
+
+            exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // This creates the exclusive consumer first which avoids AMQ-1024
+            // bug.
+            ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3");
+            MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+            ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
+            MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+
+            ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
+
+            MessageProducer producer = senderSession.createProducer(senderQueue);
+
+            Message msg = senderSession.createTextMessage("test");
+            producer.send(msg);
+            Thread.sleep(100);
+
+            // Verify exclusive consumer receives the message.
+            assertNotNull(exclusiveConsumer.receive(100));
+            assertNull(fallbackConsumer.receive(100));
+
+            // Close the exclusive consumer to verify the non-exclusive consumer
+            // takes over
+            exclusiveConsumer.close();
+
+            producer.send(msg);
+
+            assertNotNull(fallbackConsumer.receive(100));
+
+        } finally {
+            fallbackSession.close();
+            senderSession.close();
+            conn.close();
+        }
+    }
+}


Mime
View raw message