activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [45/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:16 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
new file mode 100644
index 0000000..908de0d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class JmsMessageConsumerTest {
+
+    private BrokerService brokerService;
+    private String brokerURI;
+
+    @Rule public TestName name = new TestName();
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        brokerURI = "vm://localhost?create=false";
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void testSyncReceiveWithExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+        connection.start();
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+
+        assertNull(consumer.receive(1000));
+        connection.close();
+    }
+
+    @Test
+    public void testSyncReceiveWithIgnoreExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+        factory.setConsumerExpiryCheckEnabled(false);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+        connection.start();
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+
+        assertNotNull(consumer.receive(1000));
+        connection.close();
+    }
+
+    @Test
+    public void testAsyncReceiveWithExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+
+        final CountDownLatch received = new CountDownLatch(1);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                received.countDown();
+            }
+        });
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+        connection.start();
+
+        assertFalse(received.await(1, TimeUnit.SECONDS));
+        connection.close();
+    }
+
+    @Test
+    public void testAsyncReceiveWithoutExpirationChecks() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+        factory.setConsumerExpiryCheckEnabled(false);
+
+        final CountDownLatch received = new CountDownLatch(1);
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                received.countDown();
+            }
+        });
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
+
+        producer.send(session.createTextMessage("test"));
+
+        // Allow message to expire in the prefetch buffer
+        TimeUnit.SECONDS.sleep(4);
+        connection.start();
+
+        assertTrue(received.await(5, TimeUnit.SECONDS));
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
new file mode 100644
index 0000000..1d994b9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkBridge;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.Resource;
+
+/**
+ * Test case support that allows the easy management and connection of several
+ * brokers.
+ * 
+ * 
+ */
+public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class);
+    public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
+    public static int maxSetupTime = 5000;
+
+    protected Map<String, BrokerItem> brokers;
+    protected Map<String, Destination> destinations;
+
+    protected int messageSize = 1;
+
+    protected boolean persistentDelivery = true;
+    protected boolean verbose;
+
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+        return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1, true);
+    }
+
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly) throws Exception {
+        BrokerService localBroker = brokers.get(localBrokerName).broker;
+        BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
+
+        return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true, false);
+    }
+
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
+        BrokerService localBroker = brokers.get(localBrokerName).broker;
+        BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
+
+        return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit, false);
+    }
+
+    // Overwrite this method to specify how you want to bridge the two brokers
+    // By default, bridge them using add network connector of the local broker
+    // and the first connector of the remote broker
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
+        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+        URI remoteURI;
+        if (!transportConnectors.isEmpty()) {
+            remoteURI = transportConnectors.get(0).getConnectUri();
+            String uri = "static:(" + remoteURI + ")";
+            if (failover) {
+                uri = "static:(failover:(" + remoteURI + "))";
+            }
+            NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+            connector.setName("to-" + remoteBroker.getBrokerName());
+            connector.setDynamicOnly(dynamicOnly);
+            connector.setNetworkTTL(networkTTL);
+            connector.setConduitSubscriptions(conduit);
+            localBroker.addNetworkConnector(connector);
+            maxSetupTime = 2000;
+            return connector;
+        } else {
+            throw new Exception("Remote broker has no registered connectors.");
+        }
+
+    }
+
+    // This will interconnect all brokers using multicast
+    protected void bridgeAllBrokers() throws Exception {
+        bridgeAllBrokers("default", 1, false, false);
+    }
+    
+    protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
+        bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
+    }
+
+    protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            List<TransportConnector> transportConnectors = broker.getTransportConnectors();
+
+            if (transportConnectors.isEmpty()) {
+                broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
+                transportConnectors = broker.getTransportConnectors();
+            }
+
+            TransportConnector transport = transportConnectors.get(0);
+            transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
+            NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
+            nc.setNetworkTTL(ttl);
+            nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
+            nc.setDecreaseNetworkConsumerPriority(decreasePriority);
+        }
+
+        // Multicasting may take longer to setup
+        maxSetupTime = 8000;
+    }
+
+
+    protected void waitForBridgeFormation(final int min) throws Exception {
+        for (BrokerItem brokerItem : brokers.values()) {
+            final BrokerService broker = brokerItem.broker;
+            waitForBridgeFormation(broker, min, 0);
+        }
+    }
+
+    public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex) throws Exception {
+        return waitForBridgeFormation(broker, min, bridgeIndex, Wait.MAX_WAIT_MILLIS*2);
+    }
+
+    public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex, long wait) throws Exception {
+
+        boolean result = false;
+        if (!broker.getNetworkConnectors().isEmpty()) {
+            result = Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    int activeCount = 0;
+                    for (NetworkBridge bridge : broker.getNetworkConnectors().get(bridgeIndex).activeBridges()) {
+                        if (bridge.getRemoteBrokerName() != null) {
+                            LOG.info("found bridge[" + bridge + "] to " + bridge.getRemoteBrokerName() + " on broker :" + broker.getBrokerName());
+                            activeCount++;
+                        }
+                    }
+                    return activeCount >= min;
+                }}, wait);
+        }
+        return result;
+    }
+
+    protected void waitForMinTopicRegionConsumerCount(final String name, final int count) throws Exception {
+        final BrokerService broker = brokers.get(name).broker;
+        final TopicRegion topicRegion =  (TopicRegion) ((RegionBroker) broker.getRegionBroker()).getTopicRegion();
+        assertTrue("found expected consumers in topic region of" + name, Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("topic consumers: " + name +", " +  topicRegion.getSubscriptions().toString());
+                return topicRegion.getSubscriptions().size()  >= count;
+            }
+        }));
+    }
+
+    /**
+     * Timed wait for {@link #hasBridge(String, String)}.
+     * 
+     * @see #hasBridge(String, String)
+     * 
+     * @param localBrokerName
+     *            - the name of the broker on the "local" side of the bridge
+     * @param remoteBrokerName
+     *            - the name of the broker on the "remote" side of the bridge
+     * @param time
+     *            - the maximum time to wait for the bridge to be established
+     * @param units
+     *            - the units for <param>time</param>
+     * @throws InterruptedException
+     *             - if the calling thread is interrupted
+     * @throws TimeoutException
+     *             - if the bridge is not established within the time limit
+     * @throws Exception
+     *             - some other unknown error occurs
+     */
+    protected void waitForBridge(final String localBrokerName,
+            final String remoteBrokerName, long time, TimeUnit units)
+            throws InterruptedException, TimeoutException, Exception {
+        if (!Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() {
+                return hasBridge(localBrokerName, remoteBrokerName);
+            }
+        }, units.toMillis(time))) {
+            throw new TimeoutException("Bridge not established from broker "
+                    + localBrokerName + " to " + remoteBrokerName + " within "
+                    + units.toMillis(time) + " milliseconds.");
+        }
+    }
+
+    /**
+     * Determines whether a bridge has been established between the specified
+     * brokers.Establishment means that connections have been created and broker
+     * info has been exchanged. Due to the asynchronous nature of the
+     * connections, there is still a possibility that the bridge may fail
+     * shortly after establishment.
+     * 
+     * @param localBrokerName
+     *            - the name of the broker on the "local" side of the bridge
+     * @param remoteBrokerName
+     *            - the name of the broker on the "remote" side of the bridge
+     */
+    protected boolean hasBridge(String localBrokerName, String remoteBrokerName) {
+        final BrokerItem fromBroker = brokers.get(localBrokerName);
+        if (fromBroker == null) {
+            throw new IllegalArgumentException("Unknown broker: "
+                    + localBrokerName);
+        }
+
+        for (BrokerInfo peerInfo : fromBroker.broker.getRegionBroker()
+                .getPeerBrokerInfos()) {
+            if (peerInfo.getBrokerName().equals(remoteBrokerName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    protected void waitForBridgeFormation() throws Exception {
+        waitForBridgeFormation(1);
+    }
+
+    protected void startAllBrokers() throws Exception {
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            broker.start();
+            broker.waitUntilStarted();
+        }
+
+        Thread.sleep(maxSetupTime);
+    }
+
+    protected BrokerService createBroker(String brokerName) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+        brokers.put(brokerName, new BrokerItem(broker));
+
+        return broker;
+    }
+
+    protected BrokerService createBroker(URI brokerUri) throws Exception {
+        BrokerService broker = BrokerFactory.createBroker(brokerUri);
+        configureBroker(broker);
+        brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+
+        return broker;
+    }
+
+    protected void configureBroker(BrokerService broker) {
+    }
+
+    protected BrokerService createBroker(Resource configFile) throws Exception {
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile);
+        brokerFactory.afterPropertiesSet();
+
+        BrokerService broker = brokerFactory.getBroker();
+        brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+
+        return broker;
+    }
+
+    protected ConnectionFactory getConnectionFactory(String brokerName) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.factory;
+        }
+        return null;
+    }
+
+    protected Connection createConnection(String brokerName) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.createConnection();
+        }
+        return null;
+    }
+
+    protected MessageConsumer createSyncConsumer(String brokerName, Destination dest) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            Connection con = brokerItem.createConnection();
+            con.start();
+            Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = sess.createConsumer(dest);
+            return consumer;
+        }
+        return null;
+    }
+
+    protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
+        return createConsumer(brokerName, dest, null, null);
+    }
+
+    protected MessageConsumer createConsumer(String brokerName, Destination dest, String messageSelector) throws Exception {
+        return createConsumer(brokerName, dest, null, messageSelector);
+    }
+    
+    protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception {
+    	return createConsumer(brokerName, dest, latch, null);
+    }
+    
+    protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.createConsumer(dest, latch, messageSelector);
+        }
+        return null;
+    }
+    
+    protected QueueBrowser createBrowser(String brokerName, Destination dest) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.createBrowser(dest);
+        }
+        return null;
+    }
+
+    protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.createDurableSubscriber(dest, name);
+        }
+        return null;
+    }
+
+    protected MessageIdList getBrokerMessages(String brokerName) {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.getAllMessages();
+        }
+        return null;
+    }
+
+    protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.getConsumerMessages(consumer);
+        }
+        return null;
+    }
+
+    protected void assertConsumersConnect(String brokerName, Destination destination, final int count, long timeout) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        Connection conn = brokerItem.createConnection();
+        conn.start();
+        ConsumerEventSource ces = new ConsumerEventSource(conn, destination);
+
+        try {
+        	final AtomicInteger actualConnected = new AtomicInteger();
+	        final CountDownLatch latch = new CountDownLatch(1);        
+	        ces.setConsumerListener(new ConsumerListener(){
+				public void onConsumerEvent(ConsumerEvent event) {
+					if( actualConnected.get() < count ) {
+						actualConnected.set(event.getConsumerCount());
+					}
+					if( event.getConsumerCount() >= count ) {
+						latch.countDown();
+					}				
+				}
+			});
+	        ces.start();
+	        
+	        latch.await(timeout, TimeUnit.MILLISECONDS);
+	        assertTrue("Expected at least "+count+" consumers to connect, but only "+actualConnected.get()+" connectect within "+timeout+" ms", actualConnected.get() >= count);
+	        
+        } finally {
+            ces.stop();
+            conn.close();
+            brokerItem.connections.remove(conn);
+        }
+    }
+
+
+    protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
+    	sendMessages(brokerName, destination, count, null);
+    }
+    
+    protected void sendMessages(String brokerName, Destination destination, int count, HashMap<String, Object>properties) throws Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+
+        Connection conn = brokerItem.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = brokerItem.createProducer(destination, sess);
+        producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
+            if (properties != null) {
+            	for (String propertyName : properties.keySet()) {
+            		msg.setObjectProperty(propertyName, properties.get(propertyName));
+            	}
+            }
+            producer.send(msg);
+            onSend(i, msg);
+        }
+
+        producer.close();
+        sess.close();
+        conn.close();
+        brokerItem.connections.remove(conn);
+    }
+
+    protected void onSend(int i, TextMessage msg) {
+    }
+
+    protected TextMessage createTextMessage(Session session, String initText) throws Exception {
+        TextMessage msg = session.createTextMessage();
+
+        // Pad message text
+        if (initText.length() < messageSize) {
+            char[] data = new char[messageSize - initText.length()];
+            Arrays.fill(data, '*');
+            String str = new String(data);
+            msg.setText(initText + str);
+
+            // Do not pad message text
+        } else {
+            msg.setText(initText);
+        }
+
+        return msg;
+    }
+
+    protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException {
+        Destination dest;
+        if (topic) {
+            dest = new ActiveMQTopic(name);
+            destinations.put(name, dest);
+            return (ActiveMQDestination)dest;
+        } else {
+            dest = new ActiveMQQueue(name);
+            destinations.put(name, dest);
+            return (ActiveMQDestination)dest;
+        }
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        brokers = new HashMap<String, BrokerItem>();
+        destinations = new HashMap<String, Destination>();
+    }
+
+    protected void tearDown() throws Exception {
+        destroyAllBrokers();
+        super.tearDown();
+    }
+
+    protected void destroyBroker(String brokerName) throws Exception {
+        BrokerItem brokerItem = brokers.remove(brokerName);
+
+        if (brokerItem != null) {
+            brokerItem.destroy();
+        }
+    }
+
+    protected void destroyAllBrokers() throws Exception {
+        for (Iterator<BrokerItem> i = brokers.values().iterator(); i.hasNext();) {
+            BrokerItem brokerItem = i.next();
+            brokerItem.destroy();
+        }
+        brokers.clear();
+    }
+
+    // Class to group broker components together
+    public class BrokerItem {
+        public BrokerService broker;
+        public ActiveMQConnectionFactory factory;
+        public List<Connection> connections;
+        public Map<MessageConsumer, MessageIdList> consumers;
+        public MessageIdList allMessages = new MessageIdList();
+        public boolean persistent;
+        private IdGenerator id;
+
+        public BrokerItem(BrokerService broker) throws Exception {
+            this.broker = broker;
+
+            factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+            factory.setConnectionIDPrefix(broker.getBrokerName());
+            consumers = Collections.synchronizedMap(new HashMap<MessageConsumer, MessageIdList>());
+            connections = Collections.synchronizedList(new ArrayList<Connection>());
+            allMessages.setVerbose(verbose);
+            id = new IdGenerator(broker.getBrokerName() + ":");
+        }
+
+        public Connection createConnection() throws Exception {
+            Connection conn = factory.createConnection();
+            conn.setClientID(id.generateId());
+
+            connections.add(conn);
+            return conn;
+        }
+
+        public MessageConsumer createConsumer(Destination dest) throws Exception {
+            return createConsumer(dest, null, null);
+        }
+        
+        public MessageConsumer createConsumer(Destination dest, String messageSelector) throws Exception {
+        	return createConsumer(dest, null, messageSelector);
+        }
+
+        public MessageConsumer createConsumer(Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
+            Connection c = createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return createConsumerWithSession(dest, s, latch, messageSelector);
+        }
+
+        public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception {
+            return createConsumerWithSession(dest, sess, null, null);
+        }
+
+        public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch, String messageSelector) throws Exception {
+            MessageConsumer client = sess.createConsumer(dest, messageSelector);
+            MessageIdList messageIdList = new MessageIdList();
+            messageIdList.setCountDownLatch(latch);
+            messageIdList.setParent(allMessages);
+            client.setMessageListener(messageIdList);
+            consumers.put(client, messageIdList);
+            return client;
+        }
+        
+        public QueueBrowser createBrowser(Destination dest) throws Exception {
+            Connection c = createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return s.createBrowser((Queue)dest);
+        }
+
+        public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception {
+            Connection c = createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return createDurableSubscriber(dest, s, name);
+        }
+
+        public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception {
+            MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name);
+            MessageIdList messageIdList = new MessageIdList();
+            messageIdList.setParent(allMessages);
+            client.setMessageListener(messageIdList);
+            consumers.put(client, messageIdList);
+
+            return client;
+        }
+
+        public MessageIdList getAllMessages() {
+            return allMessages;
+        }
+
+        public MessageIdList getConsumerMessages(MessageConsumer consumer) {
+            return consumers.get(consumer);
+        }
+
+        public MessageProducer createProducer(Destination dest) throws Exception {
+            Connection c = createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return createProducer(dest, s);
+        }
+
+        public MessageProducer createProducer(Destination dest, Session sess) throws Exception {
+            MessageProducer client = sess.createProducer(dest);
+            client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            return client;
+        }
+
+        public void destroy() throws Exception {
+            while (!connections.isEmpty()) {
+                Connection c = connections.remove(0);
+                try {
+                    c.close();
+                } catch (ConnectionClosedException e) {
+                } catch (JMSException e) {
+                }
+            }
+
+            broker.stop();
+            broker.waitUntilStopped();
+            consumers.clear();
+
+            broker = null;
+            connections = null;
+            consumers = null;
+            factory = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
new file mode 100644
index 0000000..5eaab8d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test case support used to test multiple message comsumers and message
+ * producers connecting to a single broker.
+ * 
+ * 
+ */
+public class JmsMultipleClientsTestSupport {
+
+    @Rule
+    public TestName testName = new TestName();
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsMultipleClientsTestSupport.class);
+
+    protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages
+                                                // received
+    protected int consumerCount = 1;
+    protected int producerCount = 1;
+
+    protected int messageSize = 1024;
+
+    protected boolean useConcurrentSend = true;
+    protected boolean autoFail = true;
+    protected boolean durable;
+    public boolean topic;
+    protected boolean persistent;
+
+    protected BrokerService broker;
+    protected Destination destination;
+    protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
+    protected MessageIdList allMessagesList = new MessageIdList();
+
+    private AtomicInteger producerLock;
+
+    protected void startProducers(Destination dest, int msgCount) throws Exception {
+        startProducers(createConnectionFactory(), dest, msgCount);
+    }
+
+    protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception {
+        // Use concurrent send
+        if (useConcurrentSend) {
+            producerLock = new AtomicInteger(producerCount);
+
+            for (int i = 0; i < producerCount; i++) {
+                Thread t = new Thread(new Runnable() {
+                    public void run() {
+                        try {
+                            sendMessages(factory.createConnection(), dest, msgCount);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+
+                        synchronized (producerLock) {
+                            producerLock.decrementAndGet();
+                            producerLock.notifyAll();
+                        }
+                    }
+                });
+
+                t.start();
+            }
+
+            // Wait for all producers to finish sending
+            synchronized (producerLock) {
+                while (producerLock.get() != 0) {
+                    producerLock.wait(2000);
+                }
+            }
+
+            // Use serialized send
+        } else {
+            for (int i = 0; i < producerCount; i++) {
+                sendMessages(factory.createConnection(), dest, msgCount);
+            }
+        }
+    }
+
+    protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage msg = createTextMessage(session, "" + i);
+            producer.send(msg);
+        }
+
+        producer.close();
+        session.close();
+        connection.close();
+    }
+
+    protected TextMessage createTextMessage(Session session, String initText) throws Exception {
+        TextMessage msg = session.createTextMessage();
+
+        // Pad message text
+        if (initText.length() < messageSize) {
+            char[] data = new char[messageSize - initText.length()];
+            Arrays.fill(data, '*');
+            String str = new String(data);
+            msg.setText(initText + str);
+
+            // Do not pad message text
+        } else {
+            msg.setText(initText);
+        }
+
+        return msg;
+    }
+
+    protected void startConsumers(Destination dest) throws Exception {
+        startConsumers(createConnectionFactory(), dest);
+    }
+
+    protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
+        MessageConsumer consumer;
+        for (int i = 0; i < consumerCount; i++) {
+            if (durable && topic) {
+                consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
+            } else {
+                consumer = createMessageConsumer(factory.createConnection(), dest);
+            }
+            MessageIdList list = new MessageIdList();
+            list.setParent(allMessagesList);
+            consumer.setMessageListener(list);
+            consumers.put(consumer, list);
+        }
+    }
+
+    protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception {
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(dest);
+        conn.start();
+
+        return consumer;
+    }
+
+    protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception {
+        conn.setClientID(name);
+        connections.add(conn);
+        conn.start();
+
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name);
+
+        return consumer;
+    }
+
+    protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception {
+        allMessagesList.waitForMessagesToArrive(messageCount);
+    }
+
+    protected ActiveMQDestination createDestination() throws JMSException {
+        String name = "." + getClass().getName() + "." + getName();
+        // ensure not inadvertently composite because of combos
+        name = name.replace(' ','_');
+        name = name.replace(',','&');
+        if (topic) {
+            destination = new ActiveMQTopic("Topic" + name);
+            return (ActiveMQDestination)destination;
+        } else {
+            destination = new ActiveMQQueue("Queue" + name);
+            return (ActiveMQDestination)destination;
+        }
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection conn = iter.next();
+            try {
+                conn.close();
+            } catch (Throwable e) {
+            }
+        }
+        if (broker !=null ) { // FIXME remove
+        broker.stop();
+        allMessagesList.flushMessages();
+        consumers.clear();
+        }
+    }
+
+    /*
+     * Some helpful assertions for multiple consumers.
+     */
+    protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) {
+        MessageIdList messageIdList = consumers.get(consumer);
+        messageIdList.assertAtLeastMessagesReceived(msgCount);
+    }
+
+    protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) {
+        MessageIdList messageIdList = consumers.get(consumer);
+        messageIdList.assertAtMostMessagesReceived(msgCount);
+    }
+
+    protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) {
+        MessageIdList messageIdList = consumers.get(consumer);
+        messageIdList.assertMessagesReceivedNoWait(msgCount);
+    }
+
+    protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
+        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
+            assertConsumerReceivedAtLeastXMessages(i.next(), msgCount);
+        }
+    }
+
+    protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) {
+        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
+            assertConsumerReceivedAtMostXMessages(i.next(), msgCount);
+        }
+    }
+
+    protected void assertEachConsumerReceivedXMessages(int msgCount) {
+        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
+            assertConsumerReceivedXMessages(i.next(), msgCount);
+        }
+    }
+
+    protected void assertTotalMessagesReceived(int msgCount) {
+        allMessagesList.assertMessagesReceivedNoWait(msgCount);
+
+        // now lets count the individual messages received
+        int totalMsg = 0;
+        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
+            MessageIdList messageIdList = consumers.get(i.next());
+            totalMsg += messageIdList.getMessageCount();
+        }
+        assertEquals("Total of consumers message count", msgCount, totalMsg);
+    }
+
+
+    public String getName() {
+        return getName(false);
+    }
+
+    public String getName(boolean original) {
+        String currentTestName = testName.getMethodName();
+        currentTestName = currentTestName.replace("[","");
+        currentTestName = currentTestName.replace("]","");
+        return currentTestName;
+    }
+
+    public void assertDestinationMemoryUsageGoesToZero() throws Exception {
+        assertEquals("destination memory is back to 0", 0,
+                TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
+    }
+
+
+
+    /*
+     * This is copied from AutoFailTestSupport.  We may want to move it to someplace where more
+     * tests can use it.
+     */
+    public static void dumpAllThreads(String prefix) {
+        Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
+        for (Map.Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
+            System.err.println(prefix + " " + stackEntry.getKey());
+            for(StackTraceElement element : stackEntry.getValue()) {
+                System.err.println("     " + element);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
new file mode 100644
index 0000000..c063e24
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsQueueBrowserTest extends JmsTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionFactoryTest.class);
+    public boolean isUseCache = false;
+
+    public static Test suite() throws Exception {
+        return suite(JmsQueueBrowserTest.class);
+    }
+
+    /**
+     * Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still
+     * be in the queue even when it was browsed.
+     *
+     * @throws Exception
+     */
+    public void testReceiveBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        Message[] outbound = new Message[]{session.createTextMessage("First Message"),
+                                           session.createTextMessage("Second Message"),
+                                           session.createTextMessage("Third Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        producer.send(outbound[2]);
+
+        // Get the first.
+        assertEquals(outbound[0], consumer.receive(1000));
+        consumer.close();
+
+        QueueBrowser browser = session.createBrowser(destination);
+        Enumeration<?> enumeration = browser.getEnumeration();
+
+        // browse the second
+        assertTrue("should have received the second message", enumeration.hasMoreElements());
+        assertEquals(outbound[1], enumeration.nextElement());
+
+        // browse the third.
+        assertTrue("Should have received the third message", enumeration.hasMoreElements());
+        assertEquals(outbound[2], enumeration.nextElement());
+
+        // There should be no more.
+        boolean tooMany = false;
+        while (enumeration.hasMoreElements()) {
+            LOG.info("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText());
+            tooMany = true;
+        }
+        assertFalse(tooMany);
+        browser.close();
+
+        // Re-open the consumer.
+        consumer = session.createConsumer(destination);
+        // Receive the second.
+        assertEquals(outbound[1], consumer.receive(1000));
+        // Receive the third.
+        assertEquals(outbound[2], consumer.receive(1000));
+        consumer.close();
+    }
+
+    public void initCombosForTestBatchSendBrowseReceive() {
+        addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testBatchSendBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        TextMessage[] outbound = new TextMessage[10];
+        for (int i=0; i<10; i++) {
+            outbound[i] = session.createTextMessage( i + " Message");
+        };
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+        consumer.close();
+
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        QueueBrowser browser = session.createBrowser(destination);
+        Enumeration<?> enumeration = browser.getEnumeration();
+
+        for (int i=0; i<outbound.length; i++) {
+            assertTrue("should have a", enumeration.hasMoreElements());
+            assertEquals(outbound[i], enumeration.nextElement());
+        }
+        browser.close();
+
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        // verify second batch is visible to browse
+        browser = session.createBrowser(destination);
+        enumeration = browser.getEnumeration();
+        for (int j=0; j<2;j++) {
+            for (int i=0; i<outbound.length; i++) {
+                assertTrue("should have a", enumeration.hasMoreElements());
+                assertEquals("j=" + j + ", i=" + i, outbound[i].getText(), ((TextMessage) enumeration.nextElement()).getText());
+            }
+        }
+        browser.close();
+
+        consumer = session.createConsumer(destination);
+        for (int i=0; i<outbound.length * 2; i++) {
+            assertNotNull("Got message: " + i, consumer.receive(2000));
+        }
+        consumer.close();
+    }
+
+    public void initCombosForTestBatchSendJmxBrowseReceive() {
+        addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testBatchSendJmxBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        TextMessage[] outbound = new TextMessage[10];
+        for (int i=0; i<10; i++) {
+            outbound[i] = session.createTextMessage( i + " Message");
+        };
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+        consumer.close();
+
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+
+        LOG.info("Create QueueView MBean...");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        long concount = proxy.getConsumerCount();
+        LOG.info("Consumer Count :" + concount);
+        long messcount = proxy.getQueueSize();
+        LOG.info("current number of messages in the queue :" + messcount);
+
+        // lets browse
+        CompositeData[] compdatalist = proxy.browse();
+        if (compdatalist.length == 0) {
+            fail("There is no message in the queue:");
+        }
+        String[] messageIDs = new String[compdatalist.length];
+
+        for (int i = 0; i < compdatalist.length; i++) {
+            CompositeData cdata = compdatalist[i];
+
+            if (i == 0) {
+                LOG.info("Columns: " + cdata.getCompositeType().keySet());
+            }
+            messageIDs[i] = (String)cdata.get("JMSMessageID");
+            LOG.info("message " + i + " : " + cdata.values());
+        }
+
+        TabularData table = proxy.browseAsTable();
+        LOG.info("Found tabular data: " + table);
+        assertTrue("Table should not be empty!", table.size() > 0);
+
+        assertEquals("Queue size", outbound.length, proxy.getQueueSize());
+        assertEquals("Queue size", outbound.length, compdatalist.length);
+        assertEquals("Queue size", outbound.length, table.size());
+
+
+        LOG.info("Send another 10");
+        for (int i=0;i<outbound.length; i++) {
+            producer.send(outbound[i]);
+        }
+
+        LOG.info("Browse again");
+
+        messcount = proxy.getQueueSize();
+        LOG.info("current number of messages in the queue :" + messcount);
+
+        compdatalist = proxy.browse();
+        if (compdatalist.length == 0) {
+            fail("There is no message in the queue:");
+        }
+        messageIDs = new String[compdatalist.length];
+
+        for (int i = 0; i < compdatalist.length; i++) {
+            CompositeData cdata = compdatalist[i];
+
+            if (i == 0) {
+                LOG.info("Columns: " + cdata.getCompositeType().keySet());
+            }
+            messageIDs[i] = (String)cdata.get("JMSMessageID");
+            LOG.info("message " + i + " : " + cdata.values());
+        }
+
+        table = proxy.browseAsTable();
+        LOG.info("Found tabular data: " + table);
+        assertTrue("Table should not be empty!", table.size() > 0);
+
+        assertEquals("Queue size", outbound.length*2, proxy.getQueueSize());
+        assertEquals("Queue size", outbound.length*2, compdatalist.length);
+        assertEquals("Queue size", outbound.length * 2, table.size());
+
+        consumer = session.createConsumer(destination);
+        for (int i=0; i<outbound.length * 2; i++) {
+            assertNotNull("Got message: " + i, consumer.receive(2000));
+        }
+        consumer.close();
+    }
+
+    public void testBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+
+        connection.start();
+
+        // create consumer
+        MessageConsumer consumer = session.createConsumer(destination);
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+
+        Message[] outbound = new Message[]{session.createTextMessage("First Message"),
+                                           session.createTextMessage("Second Message"),
+                                           session.createTextMessage("Third Message")};
+
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(outbound[0]);
+
+        // create browser first
+        QueueBrowser browser = session.createBrowser(destination);
+        Enumeration<?> enumeration = browser.getEnumeration();
+
+        // browse the first message
+        assertTrue("should have received the first message", enumeration.hasMoreElements());
+        assertEquals(outbound[0], enumeration.nextElement());
+
+        // Receive the first message.
+        assertEquals(outbound[0], consumer.receive(1000));
+        consumer.close();
+        browser.close();
+        producer.close();
+    }
+
+    public void testLargeNumberOfMessages() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        connection.start();
+
+        MessageProducer producer = session.createProducer(destination);
+
+        int numberOfMessages = 4096;
+
+        for (int i = 0; i < numberOfMessages; i++) {
+            producer.send(session.createTextMessage("Message: "  + i));
+        }
+
+        QueueBrowser browser = session.createBrowser(destination);
+        Enumeration<?> enumeration = browser.getEnumeration();
+
+        assertTrue(enumeration.hasMoreElements());
+
+        int numberBrowsed = 0;
+
+        while (enumeration.hasMoreElements()) {
+            Message browsed = (Message) enumeration.nextElement();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Browsed Message [{}]", browsed.getJMSMessageID());
+            }
+
+            numberBrowsed++;
+        }
+
+        System.out.println("Number browsed:  " + numberBrowsed);
+        assertEquals(numberOfMessages, numberBrowsed);
+        browser.close();
+        producer.close();
+    }
+
+    public void testQueueBrowserWith2Consumers() throws Exception {
+        final int numMessages = 1000;
+        connection.setAlwaysSyncSend(false);
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        ActiveMQQueue destinationPrefetch10 = new ActiveMQQueue("TEST?jms.prefetchSize=10");
+        ActiveMQQueue destinationPrefetch1 = new ActiveMQQueue("TEST?jms.prefetchsize=1");
+        connection.start();
+
+        ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
+        connection2.start();
+        connections.add(connection2);
+        Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destinationPrefetch10);
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+
+        for (int i=0; i<numMessages; i++) {
+            TextMessage message = session.createTextMessage("Message: " + i);
+            producer.send(message);
+        }
+
+        QueueBrowser browser = session2.createBrowser(destinationPrefetch1);
+        @SuppressWarnings("unchecked")
+        Enumeration<Message> browserView = browser.getEnumeration();
+
+        List<Message> messages = new ArrayList<Message>();
+        for (int i = 0; i < numMessages; i++) {
+            Message m1 = consumer.receive(5000);
+            assertNotNull("m1 is null for index: " + i, m1);
+            messages.add(m1);
+        }
+
+        int i = 0;
+        for (; i < numMessages && browserView.hasMoreElements(); i++) {
+            Message m1 = messages.get(i);
+            Message m2 = browserView.nextElement();
+            assertNotNull("m2 is null for index: " + i, m2);
+            assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
+        }
+
+        // currently browse max page size is ignored for a queue browser consumer
+        // only guarantee is a page size - but a snapshot of pagedinpending is
+        // used so it is most likely more
+        assertTrue("got at least our expected minimum in the browser: ", i > BaseDestination.MAX_PAGE_SIZE);
+
+        assertFalse("nothing left in the browser", browserView.hasMoreElements());
+        assertNull("consumer finished", consumer.receiveNoWait());
+    }
+
+    public void testBrowseClose() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+
+        connection.start();
+
+        TextMessage[] outbound = new TextMessage[]{session.createTextMessage("First Message"),
+                                           session.createTextMessage("Second Message"),
+                                           session.createTextMessage("Third Message")};
+
+        // create consumer
+        MessageConsumer consumer = session.createConsumer(destination);
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        producer.send(outbound[2]);
+
+        // create browser first
+        QueueBrowser browser = session.createBrowser(destination);
+        Enumeration<?> enumeration = browser.getEnumeration();
+
+        // browse some messages
+        assertEquals(outbound[0], enumeration.nextElement());
+        assertEquals(outbound[1], enumeration.nextElement());
+        //assertEquals(outbound[2], (Message) enumeration.nextElement());
+
+        browser.close();
+
+        // Receive the first message.
+        TextMessage msg = (TextMessage)consumer.receive(1000);
+        assertEquals("Expected " + outbound[0].getText() + " but received " + msg.getText(),  outbound[0], msg);
+        msg = (TextMessage)consumer.receive(1000);
+        assertEquals("Expected " + outbound[1].getText() + " but received " + msg.getText(), outbound[1], msg);
+        msg = (TextMessage)consumer.receive(1000);
+        assertEquals("Expected " + outbound[2].getText() + " but received " + msg.getText(), outbound[2], msg);
+
+        consumer.close();
+        producer.close();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = super.createBroker();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setUseCache(isUseCache);
+        policyEntry.setMaxBrowsePageSize(4096);
+        policyMap.setDefaultEntry(policyEntry);
+        brokerService.setDestinationPolicy(policyMap);
+        return brokerService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java
new file mode 100644
index 0000000..f381ec0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+import org.apache.activemq.util.Wait;
+
+
+/**
+ * 
+ */
+public class JmsQueueCompositeSendReceiveTest extends JmsTopicSendReceiveTest {
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+            .getLog(JmsQueueCompositeSendReceiveTest.class);
+    
+    /**
+     * Sets a test to have a queue destination and non-persistent delivery mode.
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = false;
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        super.setUp();
+    }
+
+    /**
+     * Returns the consumer subject.
+     *
+     * @return String - consumer subject
+     * @see org.apache.activemq.test.TestSupport#getConsumerSubject()
+     */
+    protected String getConsumerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+
+    /**
+     * Returns the producer subject.
+     *
+     * @return String - producer subject
+     * @see org.apache.activemq.test.TestSupport#getProducerSubject()
+     */
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG,FOO.BAR.HUMBUG2";
+    }
+   
+    /**
+     * Test if all the messages sent are being received.
+     *
+     * @throws Exception
+     */
+    public void testSendReceive() throws Exception {
+        super.testSendReceive();
+        messages.clear();
+        Destination consumerDestination = consumeSession.createQueue("FOO.BAR.HUMBUG2");
+        LOG.info("Created  consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+        MessageConsumer consumer = null;
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            consumer = consumeSession.createDurableSubscriber((Topic) consumerDestination, getName());
+        } else {
+            consumer = consumeSession.createConsumer(consumerDestination);
+        }
+        consumer.setMessageListener(this);
+
+        assertMessagesAreReceived();
+        LOG.info("" + data.length + " messages(s) received, closing down connections");
+    }
+    
+    public void testDuplicate() throws Exception {
+    	ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("TEST,TEST");
+        for (int i = 0; i < data.length; i++) {
+            Message message = createMessage(i);
+            configureMessage(message);
+            if (verbose) {
+                LOG.info("About to send a message: " + message + " with text: " + data[i]);
+            }
+            producer.send(queue, message);
+        }
+        
+        Thread.sleep(200); // wait for messages to be queued
+        
+        BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
+        final Queue dest = (Queue)((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue("TEST"));
+        assertTrue("all messages were received", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                return data.length == dest.getDestinationStatistics().getMessages().getCount();
+            }}));
+        
+        dest.purge();
+        assertEquals(0, dest.getDestinationStatistics().getMessages().getCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java
new file mode 100644
index 0000000..9282c0c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * 
+ */
+public class JmsQueueRequestReplyTest extends JmsTopicRequestReplyTest {
+
+    /**
+     * Set up the test with a queue.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
new file mode 100644
index 0000000..449edda
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+/**
+ * 
+ */
+public class JmsQueueSelectorTest extends JmsTopicSelectorTest {
+    public void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java
new file mode 100644
index 0000000..73e3e24
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+/**
+ * 
+ */
+public class JmsQueueSendReceiveTest extends JmsTopicSendReceiveTest {
+
+    /**
+     * Set up the test with a queue.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java
new file mode 100644
index 0000000..367aaeb
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest extends JmsQueueSendReceiveTwoConnectionsTest {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.class);
+
+    private Queue<Exception> errors = new ConcurrentLinkedQueue<Exception>();
+    private int delayBeforeStartingBroker = 1000;
+    private BrokerService broker;
+
+    public void startBroker() {
+        // Initialize the broker
+        LOG.info("Lets wait: " + delayBeforeStartingBroker + " millis  before creating the broker");
+        try {
+            Thread.sleep(delayBeforeStartingBroker);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        LOG.info("Now starting the broker");
+        try {
+            broker = new BrokerService();
+            broker.setPersistent(false);
+            broker.addConnector("tcp://localhost:61616");
+            broker.start();
+        } catch (Exception e) {
+            LOG.info("Caught: " + e);
+            errors.add(e);
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?maxReconnectAttempts=10&useExponentialBackOff=false&initialReconnectDelay=200");
+    }
+
+    protected void setUp() throws Exception {
+        setAutoFail(true);
+        // now lets asynchronously start a broker
+        Thread thread = new Thread() {
+            public void run() {
+                startBroker();
+            }
+        };
+        thread.start();
+
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+
+        if (broker != null) {
+            broker.stop();
+        }
+        if (!errors.isEmpty()) {
+            Exception e = errors.remove();
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java
new file mode 100644
index 0000000..f29cc09
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
+
+/**
+ *
+ */
+public class JmsQueueSendReceiveTwoConnectionsTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
+
+    /**
+     * Set up the test with a queue and using two connections.
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Override
+    protected void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java
new file mode 100644
index 0000000..cb793d0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * 
+ */
+public class JmsQueueSendReceiveUsingTwoSessionsTest extends JmsQueueSendReceiveTest {
+
+    /**
+     * Set up the test using two sessions.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        useSeparateSession = true;
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java
new file mode 100644
index 0000000..d92696e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+
+/**
+ * 
+ */
+public class JmsQueueTopicCompositeSendReceiveTest extends JmsTopicSendReceiveTest {
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+            .getLog(JmsQueueTopicCompositeSendReceiveTest.class);
+    Destination consumerDestination2;
+    MessageConsumer consumer2;
+
+    /**
+     * Sets a test to have a queue destination and non-persistent delivery mode.
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        topic = false;
+        super.setUp();
+        consumerDestination2 = consumeSession.createTopic("FOO.BAR.HUMBUG2");
+        LOG.info("Created  consumer destination: " + consumerDestination2 + " of type: " + consumerDestination2.getClass());
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            consumer2 = consumeSession.createDurableSubscriber((Topic) consumerDestination2, getName());
+        } else {
+            consumer2 = consumeSession.createConsumer(consumerDestination2);
+        }
+
+    }
+
+    /**
+     * Returns the consumer subject.
+     *
+     * @return String - consumer subject
+     * @see org.apache.activemq.test.TestSupport#getConsumerSubject()
+     */
+    protected String getConsumerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+
+    /**
+     * Returns the producer subject.
+     *
+     * @return String - producer subject
+     * @see org.apache.activemq.test.TestSupport#getProducerSubject()
+     */
+    protected String getProducerSubject() {
+        return "queue://FOO.BAR.HUMBUG,topic://FOO.BAR.HUMBUG2";
+    }
+
+    /**
+     * Test if all the messages sent are being received.
+     *
+     * @throws Exception
+     */
+    public void testSendReceive() throws Exception {
+        super.testSendReceive();
+        messages.clear();
+        consumer2.setMessageListener(this);
+        assertMessagesAreReceived();
+        LOG.info("" + data.length + " messages(s) received, closing down connections");
+    }
+}


Mime
View raw message