activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [40/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:11 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
new file mode 100644
index 0000000..d4cecab
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.spring.SpringConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZeroPrefetchConsumerTest.class);
+
+    protected Connection connection;
+    protected Queue queue;
+    protected Queue brokerZeroQueue = new ActiveMQQueue("brokerZeroConfig");
+
+    public void testCannotUseMessageListener() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageListener listener = new SpringConsumer();
+        try {
+            consumer.setMessageListener(listener);
+            fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch");
+        } catch (JMSException e) {
+            LOG.info("Received expected exception : " + e);
+        }
+    }
+
+    public void testPullConsumerWorks() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello World!"));
+
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message answer = consumer.receive(5000);
+        assertNotNull("Should have received a message!", answer);
+        // check if method will return at all and will return a null
+        answer = consumer.receive(1);
+        assertNull("Should have not received a message!", answer);
+        answer = consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    public void testIdleConsumer() throws Exception {
+        doTestIdleConsumer(false);
+    }
+
+    public void testIdleConsumerTranscated() throws Exception {
+        doTestIdleConsumer(true);
+    }
+
+    private void doTestIdleConsumer(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+        if (transacted) {
+            session.commit();
+        }
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        if (transacted) {
+            session.commit();
+        }
+        // this call would return null if prefetchSize > 0
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    public void testRecvRecvCommit() throws Exception {
+        doTestRecvRecvCommit(false);
+    }
+
+    public void testRecvRecvCommitTranscated() throws Exception {
+        doTestRecvRecvCommit(true);
+    }
+
+    private void doTestRecvRecvCommit(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+        if (transacted) {
+            session.commit();
+        }
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer.receiveNoWait();
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        answer = (TextMessage)consumer.receiveNoWait();
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    public void testTwoConsumers() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+
+        // now lets receive it
+        MessageConsumer consumer1 = session.createConsumer(queue);
+        MessageConsumer consumer2 = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer1.receiveNoWait();
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        answer = (TextMessage)consumer2.receiveNoWait();
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+
+        answer = (TextMessage)consumer2.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    // https://issues.apache.org/activemq/browse/AMQ-2567
+    public void testManyMessageConsumer() throws Exception {
+        doTestManyMessageConsumer(true);
+    }
+
+    public void testManyMessageConsumerNoTransaction() throws Exception {
+        doTestManyMessageConsumer(false);
+    }
+
+    private void doTestManyMessageConsumer(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+        producer.send(session.createTextMessage("Msg3"));
+        producer.send(session.createTextMessage("Msg4"));
+        producer.send(session.createTextMessage("Msg5"));
+        producer.send(session.createTextMessage("Msg6"));
+        producer.send(session.createTextMessage("Msg7"));
+        producer.send(session.createTextMessage("Msg8"));
+        if (transacted) {
+            session.commit();
+        }
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageConsumer consumer2  = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg3");
+        if (transacted) {
+            session.commit();
+        }
+        // this call would return null if prefetchSize > 0
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg4");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer
+        // this call should return the next message (Msg5) still left on the queue
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg5");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer
+        // this call should return the next message still left on the queue
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg6");
+        // read one more message without commit
+        // this call should return the next message still left on the queue
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg7");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer
+        // this call should return the next message (Msg5) still left on the queue
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg8");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    public void testManyMessageConsumerWithSend() throws Exception {
+        doTestManyMessageConsumerWithSend(true);
+    }
+
+    public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception {
+        ((ActiveMQConnection)connection).setMessagePrioritySupported(true);
+        doTestManyMessageConsumerWithSend(true);
+    }
+
+    public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
+        doTestManyMessageConsumerWithSend(false);
+    }
+
+    private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED :Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+        producer.send(session.createTextMessage("Msg3"));
+        producer.send(session.createTextMessage("Msg4"));
+        producer.send(session.createTextMessage("Msg5"));
+        producer.send(session.createTextMessage("Msg6"));
+        producer.send(session.createTextMessage("Msg7"));
+        producer.send(session.createTextMessage("Msg8"));
+        if (transacted) {
+            session.commit();
+        }
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageConsumer consumer2  = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg3");
+        if (transacted) {
+            session.commit();
+        }
+        // Now using other consumer take 2
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg4");
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg5");
+
+        // ensure prefetch extension ok by sending another that could get dispatched
+        producer.send(session.createTextMessage("Msg9"));
+        if (transacted) {
+            session.commit();
+        }
+
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg6");
+        // read one more message without commit
+        // and using other consumer
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg7");
+        if (transacted) {
+            session.commit();
+        }
+
+        answer = (TextMessage)consumer2.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg8");
+        if (transacted) {
+            session.commit();
+        }
+
+        answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg9");
+        if (transacted) {
+            session.commit();
+        }
+        answer = (TextMessage)consumer.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    // https://issues.apache.org/jira/browse/AMQ-4224
+    public void testBrokerZeroPrefetchConfig() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(brokerZeroQueue);
+        producer.send(session.createTextMessage("Msg1"));
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
+
+        TextMessage answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+    }
+
+    // https://issues.apache.org/jira/browse/AMQ-4234
+    // https://issues.apache.org/jira/browse/AMQ-4235
+    public void testBrokerZeroPrefetchConfigWithConsumerControl() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue);
+        assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
+
+        // verify sub view broker
+        Subscription sub =
+                broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
+        assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
+
+        // manipulate Prefetch (like failover and stomp)
+        ConsumerControl consumerControl = new ConsumerControl();
+        consumerControl.setConsumerId(consumer.info.getConsumerId());
+        consumerControl.setDestination(ActiveMQDestination.transform(brokerZeroQueue));
+        consumerControl.setPrefetch(1000); // default for a q
+
+        Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
+        assertTrue("good request", !(reply instanceof ExceptionResponse));
+        assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
+        assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = super.createBroker();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
+        zeroPrefetchPolicy.setQueuePrefetch(0);
+        policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy);
+        brokerService.setDestinationPolicy(policyMap);
+        return brokerService;
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+        queue = createQueue();
+    }
+
+    @Override
+    protected void startBroker() throws Exception {
+        super.startBroker();
+        bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        connection.close();
+        super.tearDown();
+    }
+
+    protected Queue createQueue() {
+        return new ActiveMQQueue(getDestinationString() + "?consumer.prefetchSize=0");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
new file mode 100644
index 0000000..5e20f79
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
+public class AdvisoryTempDestinationTests extends TestCase {
+
+    protected static final int MESSAGE_COUNT = 2000;
+    protected BrokerService broker;
+    protected Connection connection;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+    protected int topicCount;
+
+
+    public void testNoSlowConsumerAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = s.createTemporaryQueue();
+        MessageConsumer consumer = s.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+        Topic advisoryTopic = AdvisorySupport
+                .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
+        s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Message msg = advisoryConsumer.receive(1000);
+        assertNull(msg);
+    }
+
+    public void testSlowConsumerAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = s.createTemporaryQueue();
+        MessageConsumer consumer = s.createConsumer(queue);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport
+                .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
+        s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+
+    public void testMessageDeliveryAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = s.createTemporaryQueue();
+        MessageConsumer consumer = s.createConsumer(queue);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+
+    public void testTempMessageConsumedAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = s.createTemporaryQueue();
+        MessageConsumer consumer = s.createConsumer(queue);
+
+        Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+        String id = m.getJMSMessageID();
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+
+        msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+        String originalId = payload.getJMSMessageID();
+        assertEquals(originalId, id);
+    }
+
+    public void testMessageExpiredAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        producer.setTimeToLive(1);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+
+        Message msg = advisoryConsumer.receive(5000);
+        assertNotNull(msg);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        ConnectionFactory factory = createConnectionFactory();
+        connection = factory.createConnection();
+        connection.start();
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection.close();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
+        strategy.setLimit(10);
+        PolicyEntry tempQueueEntry = createPolicyEntry(strategy);
+        tempQueueEntry.setTempQueue(true);
+        PolicyEntry tempTopicEntry = createPolicyEntry(strategy);
+        tempTopicEntry.setTempTopic(true);
+
+        PolicyMap pMap = new PolicyMap();
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        policyEntries.add(tempQueueEntry);
+        policyEntries.add(tempTopicEntry);
+        pMap.setPolicyEntries(policyEntries);
+
+        answer.setDestinationPolicy(pMap);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+
+    private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) {
+        PolicyEntry policy = new PolicyEntry();
+        policy.setAdvisoryForFastProducers(true);
+        policy.setAdvisoryForConsumed(true);
+        policy.setAdvisoryForDelivery(true);
+        policy.setAdvisoryForDiscardingMessages(true);
+        policy.setAdvisoryForSlowConsumers(true);
+        policy.setAdvisoryWhenFull(true);
+        policy.setProducerFlowControl(false);
+        policy.setPendingMessageLimitStrategy(strategy);
+
+        return policy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
new file mode 100644
index 0000000..4bb9053
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
+/**
+ *
+ */
+public class AdvisoryTests extends TestCase {
+    protected static final int MESSAGE_COUNT = 2000;
+    protected BrokerService broker;
+    protected Connection connection;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+    protected int topicCount;
+
+
+    public void testNoSlowConsumerAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+        Topic advisoryTopic = AdvisorySupport
+                .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
+        s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Message msg = advisoryConsumer.receive(1000);
+        assertNull(msg);
+    }
+
+    public void testSlowConsumerAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport
+                .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
+        s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+
+    public void testMessageDeliveryAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+
+    public void testMessageConsumedAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+
+        Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+        String id = m.getJMSMessageID();
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+
+        msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+        String originalId = payload.getJMSMessageID();
+        assertEquals(originalId, id);
+    }
+
+    public void testMessageExpiredAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        producer.setTimeToLive(1);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+
+        Message msg = advisoryConsumer.receive(2000);
+        assertNotNull(msg);
+    }
+
+    public void xtestMessageDiscardedAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = s.createTopic(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(topic);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(topic);
+        int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
+        for (int i = 0; i < count; i++) {
+            BytesMessage m = s.createBytesMessage();
+            producer.send(m);
+        }
+
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        ConnectionFactory factory = createConnectionFactory();
+        connection = factory.createConnection();
+        connection.start();
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection.close();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setAdvisoryForFastProducers(true);
+        policy.setAdvisoryForConsumed(true);
+        policy.setAdvisoryForDelivery(true);
+        policy.setAdvisoryForDiscardingMessages(true);
+        policy.setAdvisoryForSlowConsumers(true);
+        policy.setAdvisoryWhenFull(true);
+        policy.setProducerFlowControl(false);
+        ConstantPendingMessageLimitStrategy strategy  = new ConstantPendingMessageLimitStrategy();
+        strategy.setLimit(10);
+        policy.setPendingMessageLimitStrategy(strategy);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        answer.setDestinationPolicy(pMap);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
new file mode 100644
index 0000000..2c5f9cd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * 
+ */
+public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements ConsumerListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ConsumerListenerTest.class);
+
+    protected Session consumerSession1;
+    protected Session consumerSession2;
+    protected int consumerCounter;
+    protected ConsumerEventSource consumerEventSource;
+    protected BlockingQueue<ConsumerEvent> eventQueue = new ArrayBlockingQueue<ConsumerEvent>(1000);
+    private Connection connection;
+
+    public void testConsumerEvents() throws Exception {
+        consumerEventSource.start();
+
+        consumerSession1 = createConsumer();
+        assertConsumerEvent(1, true);
+
+        consumerSession2 = createConsumer();
+        assertConsumerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertConsumerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertConsumerEvent(0, false);
+    }
+
+    public void testListenWhileAlreadyConsumersActive() throws Exception {
+        consumerSession1 = createConsumer();
+        consumerSession2 = createConsumer();
+
+        consumerEventSource.start();
+        assertConsumerEvent(2, true);
+        assertConsumerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertConsumerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertConsumerEvent(0, false);
+    }
+
+    public void testConsumerEventsOnTemporaryDestination() throws Exception {
+
+        Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+        Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue();
+        consumerEventSource = new ConsumerEventSource(connection, dest);
+        consumerEventSource.setConsumerListener(this);
+        consumerEventSource.start();
+        MessageConsumer consumer = s.createConsumer(dest);
+        assertConsumerEvent(1,true);
+        consumer.close();
+        assertConsumerEvent(0,false);
+    }
+
+    public void onConsumerEvent(ConsumerEvent event) {
+        eventQueue.add(event);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+        consumerEventSource = new ConsumerEventSource(connection, destination);
+        consumerEventSource.setConsumerListener(this);
+    }
+
+    protected void tearDown() throws Exception {
+        if (consumerEventSource != null) {
+            consumerEventSource.stop();
+        }
+        if (consumerSession2 != null) {
+            consumerSession2.close();
+        }
+        if (consumerSession1 != null) {
+            consumerSession1.close();
+        }
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
+        ConsumerEvent event = waitForConsumerEvent();
+        assertEquals("Consumer count", count, event.getConsumerCount());
+        assertEquals("started", started, event.isStarted());
+    }
+
+    protected Session createConsumer() throws JMSException {
+        final String consumerText = "Consumer: " + (++consumerCounter);
+        LOG.info("Creating consumer: " + consumerText + " on destination: " + destination);
+
+        Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = answer.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                LOG.info("Received message by: " + consumerText + " message: " + message);
+            }
+        });
+        return answer;
+    }
+
+    protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
+        ConsumerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS);
+        assertTrue("Should have received a consumer event!", answer != null);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java
new file mode 100644
index 0000000..01dc443
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+
+/**
+ * 
+ */
+public class DestinationListenerTest extends EmbeddedBrokerTestSupport implements DestinationListener {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DestinationListenerTest.class);
+    protected ActiveMQConnection connection;
+    protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar");
+    protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese");
+    protected List<ActiveMQDestination> newDestinations = new ArrayList<ActiveMQDestination>();
+
+    public void testDestiationSourceHasInitialDestinations() throws Exception {
+        Thread.sleep(1000);
+
+        DestinationSource destinationSource = connection.getDestinationSource();
+        Set<ActiveMQQueue> queues = destinationSource.getQueues();
+        Set<ActiveMQTopic> topics = destinationSource.getTopics();
+
+        LOG.info("Queues: " + queues);
+        LOG.info("Topics: " + topics);
+
+        assertTrue("The queues should not be empty!", !queues.isEmpty());
+        assertTrue("The topics should not be empty!", !topics.isEmpty());
+
+        assertTrue("queues contains initial queue: " + queues, queues.contains(sampleQueue));
+        assertTrue("topics contains initial topic: " + queues, topics.contains(sampleTopic));
+    }
+
+    public void testConsumerForcesNotificationOfNewDestination() throws Exception {
+        // now lets cause a destination to be created
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue newQueue = new ActiveMQQueue("Test.Cheese");
+        session.createConsumer(newQueue);
+
+        Thread.sleep(3000);
+
+        assertThat(newQueue, isIn(newDestinations));
+
+        LOG.info("New destinations are: " + newDestinations);
+    }
+
+    public void testProducerForcesNotificationOfNewDestination() throws Exception {
+        // now lets cause a destination to be created
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue newQueue = new ActiveMQQueue("Test.Beer");
+        MessageProducer producer = session.createProducer(newQueue);
+        TextMessage message = session.createTextMessage("<hello>world</hello>");
+        producer.send(message);
+
+        Thread.sleep(3000);
+
+        assertThat(newQueue, isIn(newDestinations));
+
+        LOG.info("New destinations are: " + newDestinations);
+    }
+
+    public void onDestinationEvent(DestinationEvent event) {
+        ActiveMQDestination destination = event.getDestination();
+        if (event.isAddOperation()) {
+            LOG.info("Added:   " + destination);
+            newDestinations.add(destination);
+        }
+        else {
+            LOG.info("Removed: " + destination);
+            newDestinations.remove(destination);
+        }
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connection = (ActiveMQConnection) createConnection();
+        connection.start();
+        connection.getDestinationSource().setDestinationListener(this);
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.setDestinations(new ActiveMQDestination[]{
+                sampleQueue,
+                sampleTopic
+        });
+        return broker;
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
new file mode 100644
index 0000000..dfa1b5e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements ProducerListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ProducerListenerTest.class);
+
+    protected Session consumerSession1;
+    protected Session consumerSession2;
+    protected int consumerCounter;
+    protected ProducerEventSource producerEventSource;
+    protected BlockingQueue<ProducerEvent> eventQueue = new ArrayBlockingQueue<ProducerEvent>(1000);
+    private Connection connection;
+
+    public void testProducerEvents() throws Exception {
+        producerEventSource.start();
+
+        consumerSession1 = createProducer();
+        assertProducerEvent(1, true);
+
+        consumerSession2 = createProducer();
+        assertProducerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertProducerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertProducerEvent(0, false);
+    }
+
+    public void testListenWhileAlreadyConsumersActive() throws Exception {
+        consumerSession1 = createProducer();
+        consumerSession2 = createProducer();
+
+        producerEventSource.start();
+        assertProducerEvent(2, true);
+        assertProducerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertProducerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertProducerEvent(0, false);
+    }
+
+    public void testConsumerEventsOnTemporaryDestination() throws Exception {
+
+        Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+        Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue();
+        producerEventSource = new ProducerEventSource(connection, dest);
+        producerEventSource.setProducerListener(this);
+        producerEventSource.start();
+        MessageProducer producer = s.createProducer(dest);
+        assertProducerEvent(1, true);
+        producer.close();
+        assertProducerEvent(0, false);
+    }
+
+
+
+    @Override
+    public void onProducerEvent(ProducerEvent event) {
+        eventQueue.add(event);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+        producerEventSource = new ProducerEventSource(connection, destination);
+        producerEventSource.setProducerListener(this);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (producerEventSource != null) {
+            producerEventSource.stop();
+        }
+        if (consumerSession2 != null) {
+            consumerSession2.close();
+        }
+        if (consumerSession1 != null) {
+            consumerSession1.close();
+        }
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected void assertProducerEvent(int count, boolean started) throws InterruptedException {
+        ProducerEvent event = waitForProducerEvent();
+        assertEquals("Producer count", count, event.getProducerCount());
+        assertEquals("started", started, event.isStarted());
+    }
+
+    protected Session createProducer() throws JMSException {
+        final String consumerText = "Consumer: " + (++consumerCounter);
+        LOG.info("Creating consumer: " + consumerText + " on destination: " + destination);
+
+        Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = answer.createProducer(destination);
+        assertNotNull(producer);
+
+        return answer;
+    }
+
+    protected ProducerEvent waitForProducerEvent() throws InterruptedException {
+        ProducerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS);
+        assertTrue("Should have received a consumer event!", answer != null);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java
new file mode 100644
index 0000000..123c778
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class TempDestDeleteTest extends EmbeddedBrokerTestSupport implements ConsumerListener {
+    private static final Logger LOG = LoggerFactory.getLogger(TempDestDeleteTest.class);
+
+    protected int consumerCounter;
+    protected ConsumerEventSource topicConsumerEventSource;
+    protected BlockingQueue<ConsumerEvent> eventQueue = new ArrayBlockingQueue<ConsumerEvent>(1000);
+    
+    private ConsumerEventSource queueConsumerEventSource;
+    private Connection connection;
+    private Session session;
+    private ActiveMQTempTopic tempTopic;
+    private ActiveMQTempQueue tempQueue;
+
+    public void testDeleteTempTopicDeletesAvisoryTopics() throws Exception {
+        topicConsumerEventSource.start();
+
+        MessageConsumer consumer = createConsumer(tempTopic);
+        assertConsumerEvent(1, true);
+
+        Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempTopic);
+        assertTrue(destinationExists(advisoryTopic));
+
+        consumer.close();
+
+        // Once we delete the topic, the advisory topic for the destination
+        // should also be deleted.
+        tempTopic.delete();
+
+        assertFalse(destinationExists(advisoryTopic));
+    }
+
+    public void testDeleteTempQueueDeletesAvisoryTopics() throws Exception {
+        queueConsumerEventSource.start();
+
+        MessageConsumer consumer = createConsumer(tempQueue);
+        assertConsumerEvent(1, true);
+
+        Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempQueue);
+        assertTrue(destinationExists(advisoryTopic));
+
+        consumer.close();
+
+        // Once we delete the queue, the advisory topic for the destination
+        // should also be deleted.
+        tempQueue.delete();
+
+        assertFalse(destinationExists(advisoryTopic));
+    }
+
+    private boolean destinationExists(Destination dest) throws Exception {
+        RegionBroker rb = (RegionBroker)broker.getBroker().getAdaptor(RegionBroker.class);
+        return rb.getTopicRegion().getDestinationMap().containsKey(dest) || rb.getQueueRegion().getDestinationMap().containsKey(dest)
+               || rb.getTempTopicRegion().getDestinationMap().containsKey(dest) || rb.getTempQueueRegion().getDestinationMap().containsKey(dest);
+    }
+
+    public void onConsumerEvent(ConsumerEvent event) {
+        eventQueue.add(event);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        tempTopic = (ActiveMQTempTopic)session.createTemporaryTopic();
+        topicConsumerEventSource = new ConsumerEventSource(connection, tempTopic);
+        topicConsumerEventSource.setConsumerListener(this);
+
+        tempQueue = (ActiveMQTempQueue)session.createTemporaryQueue();
+        queueConsumerEventSource = new ConsumerEventSource(connection, tempQueue);
+        queueConsumerEventSource.setConsumerListener(this);
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
+        ConsumerEvent event = waitForConsumerEvent();
+        assertEquals("Consumer count", count, event.getConsumerCount());
+        assertEquals("started", started, event.isStarted());
+    }
+
+    protected MessageConsumer createConsumer(Destination dest) throws JMSException {
+        final String consumerText = "Consumer: " + (++consumerCounter);
+        LOG.info("Creating consumer: " + consumerText + " on destination: " + dest);
+
+        MessageConsumer consumer = session.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                LOG.info("Received message by: " + consumerText + " message: " + message);
+            }
+        });
+        return consumer;
+    }
+
+    protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
+        ConsumerEvent answer = eventQueue.poll(1000, TimeUnit.MILLISECONDS);
+        assertTrue("Should have received a consumer event!", answer != null);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
new file mode 100644
index 0000000..cab4e59
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class TempDestLoadTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TempDestLoadTest.class);
+
+    protected int consumerCounter;
+    private Connection connection;
+    private Session session;
+    private static final int MESSAGE_COUNT = 2000;
+
+    public void testLoadTempAdvisoryQueues() throws Exception {
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            TemporaryQueue tempQueue = session.createTemporaryQueue();
+            MessageConsumer consumer = session.createConsumer(tempQueue);
+            MessageProducer producer = session.createProducer(tempQueue);
+            consumer.close();
+            producer.close();
+            tempQueue.delete();
+        }
+
+        AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(
+                AdvisoryBroker.class);
+
+        assertTrue(ab.getAdvisoryDestinations().size() == 0);
+        assertTrue(ab.getAdvisoryConsumers().size() == 0);
+        assertTrue(ab.getAdvisoryProducers().size() == 0);
+
+        RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
+
+        for (Destination dest : rb.getDestinationMap().values()) {
+            LOG.debug("Destination: {}", dest);
+        }
+
+        // there should be at least 2 destinations - advisories -
+        // 1 for the connection + 1 generic ones
+        assertTrue("Should be at least 2 destinations", rb.getDestinationMap().size() > 2);
+    }
+
+    public void testLoadTempAdvisoryTopics() throws Exception {
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            TemporaryTopic tempTopic =  session.createTemporaryTopic();
+            MessageConsumer consumer = session.createConsumer(tempTopic);
+            MessageProducer producer = session.createProducer(tempTopic);
+            consumer.close();
+            producer.close();
+            tempTopic.delete();
+        }
+
+        AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(
+                AdvisoryBroker.class);
+        assertTrue(ab.getAdvisoryDestinations().size() == 0);
+        assertTrue(ab.getAdvisoryConsumers().size() == 0);
+        assertTrue(ab.getAdvisoryProducers().size() == 0);
+        RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
+                RegionBroker.class);
+
+        for (Destination dest : rb.getDestinationMap().values()) {
+            LOG.debug("Destination: {}", dest);
+        }
+
+        // there should be at least 2 destinations - advisories -
+        // 1 for the connection + 1 generic ones
+        assertTrue("Should be at least 2 destinations", rb.getDestinationMap().size() > 2);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
new file mode 100644
index 0000000..9bf8ed1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
+
+    protected Connection serverConnection;
+    protected Session serverSession;
+    protected Connection clientConnection;
+    protected Session clientSession;
+    protected Destination serverDestination;
+    protected int messagesToSend = 10;
+    protected boolean deleteTempQueue = true;
+    protected boolean serverTransactional = false;
+    protected boolean clientTransactional = false;
+    protected int numConsumers = 1;
+    protected int numProducers = 1;
+
+    public void testConcurrentProducerRequestReply() throws Exception {
+        numProducers = 10;
+        testLoadRequestReply();
+    }
+
+    public void testLoadRequestReply() throws Exception {
+        for (int i = 0; i < numConsumers; i++) {
+            serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message msg) {
+                    try {
+                        Destination replyTo = msg.getJMSReplyTo();
+                        MessageProducer producer = serverSession.createProducer(replyTo);
+                        producer.send(replyTo, msg);
+                        if (serverTransactional) {
+                            serverSession.commit();
+                        }
+                        producer.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        class Producer extends Thread {
+            private final int numToSend;
+
+            public Producer(int numToSend) {
+                this.numToSend = numToSend;
+            }
+
+            @Override
+            public void run() {
+                try {
+                    Session session = clientConnection.createSession(clientTransactional, clientTransactional ?
+                        Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(serverDestination);
+
+                    for (int i = 0; i < numToSend; i++) {
+                        TemporaryQueue replyTo = session.createTemporaryQueue();
+                        MessageConsumer consumer = session.createConsumer(replyTo);
+                        Message msg = session.createMessage();
+                        msg.setJMSReplyTo(replyTo);
+                        producer.send(msg);
+                        if (clientTransactional) {
+                            session.commit();
+                        }
+                        consumer.receive();
+                        if (clientTransactional) {
+                            session.commit();
+                        }
+                        consumer.close();
+                        if (deleteTempQueue) {
+                            replyTo.delete();
+                        } else {
+                            // temp queue will be cleaned up on clientConnection.close
+                        }
+                    }
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        Vector<Thread> threads = new Vector<Thread>(numProducers);
+        for (int i = 0; i < numProducers; i++) {
+            threads.add(new Producer(messagesToSend / numProducers));
+        }
+        startAndJoinThreads(threads);
+
+        clientSession.close();
+        serverSession.close();
+        clientConnection.close();
+        serverConnection.close();
+
+        AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(AdvisoryBroker.class);
+
+        // The server destination will be left
+        assertTrue(ab.getAdvisoryDestinations().size() == 1);
+
+        assertTrue("should be zero but is " + ab.getAdvisoryConsumers().size(), ab.getAdvisoryConsumers().size() == 0);
+        assertTrue("should be zero but is " + ab.getAdvisoryProducers().size(), ab.getAdvisoryProducers().size() == 0);
+
+        RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
+
+        assertTrue(rb.getDestinationMap().size() >= 6);
+    }
+
+    private void startAndJoinThreads(Vector<Thread> threads) throws Exception {
+        for (Thread thread : threads) {
+            thread.start();
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        serverConnection = createConnection();
+        serverConnection.start();
+        serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        clientConnection = createConnection();
+        clientConnection.start();
+        clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        serverDestination = createDestination();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        serverTransactional = clientTransactional = false;
+        numConsumers = numProducers = 1;
+        messagesToSend = 2000;
+    }
+
+    @Override
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue(getClass().getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java
new file mode 100644
index 0000000..4040569
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * 
+ */
+public class BlobTransferPolicyUriTest extends TestCase {
+    public void testBlobTransferPolicyIsConfiguredViaUri() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=http://foo.com");
+        BlobTransferPolicy policy = factory.getBlobTransferPolicy();
+        assertEquals("http://foo.com", policy.getDefaultUploadUrl());
+        assertEquals("http://foo.com", policy.getUploadUrl());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
new file mode 100644
index 0000000..0875a5b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+public class FTPBlobDownloadStrategyTest extends FTPTestSupport {
+
+    final int FILE_SIZE = Short.MAX_VALUE * 10;
+
+    public void testDownload() throws Exception {
+        setConnection();
+
+        // create file
+        File uploadFile = new File(ftpHomeDirFile, "test.txt");
+        FileWriter wrt = new FileWriter(uploadFile);
+
+        wrt.write("hello world");
+
+        for(int ix = 0; ix < FILE_SIZE; ++ix ) {
+            wrt.write("a");
+        }
+
+        wrt.close();
+
+        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+        BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
+        InputStream stream;
+        try {
+            message.setURL(new URL(ftpUrl + "test.txt"));
+            stream = strategy.getInputStream(message);
+            int i = stream.read();
+            StringBuilder sb = new StringBuilder(2048);
+            while(i != -1) {
+                sb.append((char)i);
+                i = stream.read();
+            }
+            assertEquals("hello world", sb.toString().substring(0, "hello world".length()));
+            assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length());
+
+            assertTrue(uploadFile.exists());
+            strategy.deleteFile(message);
+            assertFalse(uploadFile.exists());
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertTrue(false);
+        }
+    }
+
+    public void testWrongAuthentification() throws MalformedURLException {
+        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+        BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
+        try {
+            message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:"	+ ftpPort + "/ftptest/"));
+            strategy.getInputStream(message);
+        } catch(JMSException e) {
+            assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage());
+            return;
+        } catch(Exception e) {
+            System.out.println(e);
+            assertTrue("Wrong Exception "+ e, false);
+            return;
+        }
+
+        assertTrue("Expect Exception", false);
+    }
+
+    public void testWrongFTPPort() throws MalformedURLException {
+        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+        BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy());
+        try {
+            message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:"	+ 422 + "/ftptest/"));
+            strategy.getInputStream(message);
+        } catch(JMSException e) {
+            assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage());
+            return;
+        } catch(Exception e) {
+            e.printStackTrace();
+            assertTrue("Wrong Exception "+ e, false);
+            return;
+        }
+
+        assertTrue("Expect Exception", false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
new file mode 100644
index 0000000..4aecc09
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+public class FTPBlobTest extends FTPTestSupport {
+
+    public void testBlobFile() throws Exception {
+        setConnection();
+        // first create Message
+        File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+        String content = "hello world " + System.currentTimeMillis();
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append(content);
+        writer.close();
+
+        ActiveMQSession session = (ActiveMQSession) connection.createSession(
+                false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        BlobMessage message = session.createBlobMessage(file);
+        message.setName("fileName");
+
+        producer.send(message);
+        Thread.sleep(1000);
+
+        // check message send
+        Message msg = consumer.receive(1000);
+        assertTrue(msg instanceof ActiveMQBlobMessage);
+
+        assertEquals("name is correct", "fileName", ((ActiveMQBlobMessage)msg).getName());
+        InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
+        StringBuilder b = new StringBuilder();
+        int i = input.read();
+        while (i != -1) {
+            b.append((char) i);
+            i = input.read();
+        }
+        input.close();
+        File uploaded = new File(ftpHomeDirFile, msg.getJMSMessageID().toString().replace(":", "_"));
+        assertEquals(content, b.toString());
+        assertTrue(uploaded.exists());
+        ((ActiveMQBlobMessage)msg).deleteFile();
+        assertFalse(uploaded.exists());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
new file mode 100644
index 0000000..cac9a0a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+
+public class FTPBlobUploadStrategyTest extends FTPTestSupport {
+
+    public void testFileUpload() throws Exception {
+        setConnection();
+        File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append("hello world");
+        writer.close();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ((ActiveMQConnection)connection).setCopyMessageOnSend(false);
+
+        ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file);
+        message.setJMSMessageID("testmessage");
+        message.onSend();
+        assertEquals(ftpUrl + "ID_testmessage", message.getURL().toString());
+        File uploaded = new File(ftpHomeDirFile, "ID_testmessage");
+        assertTrue("File doesn't exists", uploaded.exists());
+    }
+
+    public void testWriteDenied() throws Exception {
+        userNamePass = "guest";
+        setConnection();
+        File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append("hello world");
+        writer.close();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ((ActiveMQConnection)connection).setCopyMessageOnSend(false);
+
+        ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file);
+        message.setJMSMessageID("testmessage");
+        try {
+            message.onSend();
+        } catch (JMSException e) {
+            e.printStackTrace();
+            return;
+        }
+        fail("Should have failed with permission denied exception!");
+    }
+
+}


Mime
View raw message