activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r634277 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ test/java/org/apache/activemq/advisory/ test/java/org/apache/activemq/camel/component/
Date Thu, 06 Mar 2008 14:35:27 GMT
Author: jstrachan
Date: Thu Mar  6 06:35:26 2008
New Revision: 634277

URL: http://svn.apache.org/viewvc?rev=634277&view=rev
Log:
added support for http://issues.apache.org/activemq/browse/AMQ-1199 so that its easy to browse
the available queues & topics in a broker

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationEvent.java
      - copied, changed from r634256, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationListener.java
      - copied, changed from r634256, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationSource.java
      - copied, changed from r634256, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java
      - copied, changed from r634256, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java
      - copied, changed from r634256, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=634277&r1=634276&r2=634277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Mar  6 06:35:26 2008
@@ -94,6 +94,7 @@
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.advisory.DestinationSource;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -178,6 +179,7 @@
     private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     private long timeCreated;
     private ConnectionAudit connectionAudit = new ConnectionAudit();
+    private DestinationSource destinationSource;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -554,6 +556,10 @@
                 if (!closed.get()) {
                     closing.set(true);
 
+                    if (destinationSource != null) {
+                        destinationSource.stop();
+                        destinationSource = null;
+                    }
                     if (advisoryConsumer != null) {
                         advisoryConsumer.dispose();
                         advisoryConsumer = null;
@@ -908,6 +914,21 @@
         this.stats.setEnabled(statsEnabled);
     }
 
+    /**
+     * Returns the {@link DestinationSource} object which can be used to listen to destinations
+     * being created or destroyed or to enquire about the current destinations available
on the broker
+     *
+     * @return a lazily created destination source
+     * @throws JMSException
+     */
+    public DestinationSource getDestinationSource() throws JMSException {
+        if (destinationSource == null) {
+            destinationSource = new DestinationSource(this);
+            destinationSource.start();
+        }
+        return destinationSource;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 
@@ -2079,4 +2100,5 @@
     protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
         connectionAudit.rollbackDuplicate(dispatcher, message);
     }
+
 }

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationEvent.java
(from r634256, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationEvent.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationEvent.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java&r1=634256&r2=634277&rev=634277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationEvent.java
Thu Mar  6 06:35:26 2008
@@ -21,49 +21,40 @@
 import javax.jms.Destination;
 
 import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ActiveMQDestination;
 
 /**
- * An event when the number of consumers on a given destination changes.
- * 
+ * An event caused when a destination is created or deleted
+ *
  * @version $Revision$
  */
-public abstract class ConsumerEvent extends EventObject {
+public class DestinationEvent extends EventObject {
     private static final long serialVersionUID = 2442156576867593780L;
-    private final Destination destination;
-    private final ConsumerId consumerId;
-    private final int consumerCount;
+    private DestinationInfo destinationInfo;
 
-    public ConsumerEvent(ConsumerEventSource source, Destination destination, ConsumerId
consumerId, int consumerCount) {
+    public DestinationEvent(DestinationSource source, DestinationInfo destinationInfo) {
         super(source);
-        this.destination = destination;
-        this.consumerId = consumerId;
-        this.consumerCount = consumerCount;
+        this.destinationInfo = destinationInfo;
     }
 
-    public ConsumerEventSource getAdvisor() {
-        return (ConsumerEventSource) getSource();
+    public ActiveMQDestination getDestination() {
+        return getDestinationInfo().getDestination();
     }
 
-    public Destination getDestination() {
-        return destination;
+    public boolean isAddOperation() {
+        return getDestinationInfo().isAddOperation();
     }
 
-    /**
-     * Returns the current number of consumers active at the time this advisory was sent.
-     * 
-     * Note that this is not the number of consumers active when the consumer started consuming.
-     * It is usually more vital to know how many consumers there are now - rather than historically
-     * how many there were when a consumer started. So if you create a {@link ConsumerListener}
-     * after many consumers have started, you will receive a ConsumerEvent for each consumer.
However the
-     * {@link #getConsumerCount()} method will always return the current active consumer
count on each event.
-     */
-    public int getConsumerCount() {
-        return consumerCount;
+    public long getTimeout() {
+        return getDestinationInfo().getTimeout();
     }
 
-    public ConsumerId getConsumerId() {
-        return consumerId;
+    public boolean isRemoveOperation() {
+        return getDestinationInfo().isRemoveOperation();
     }
 
-    public abstract boolean isStarted();
-}
+    public DestinationInfo getDestinationInfo() {
+        return destinationInfo;
+    }
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationListener.java
(from r634256, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerListener.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationListener.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationListener.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerListener.java&r1=634256&r2=634277&rev=634277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerListener.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationListener.java
Thu Mar  6 06:35:26 2008
@@ -17,11 +17,10 @@
 package org.apache.activemq.advisory;
 
 /**
- * Listen to the changes in the number of active consumers available for a given destination.
- * 
+ * Listen to the changes in destinations being created or destroyed
+ *
  * @version $Revision$
  */
-public interface ConsumerListener {
-
-    void onConsumerEvent(ConsumerEvent event);
-}
+public interface DestinationListener {
+    void onDestinationEvent(DestinationEvent event);
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationSource.java
(from r634256, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationSource.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationSource.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java&r1=634256&r2=634277&rev=634277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/DestinationSource.java
Thu Mar  6 06:35:26 2008
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.advisory;
 
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
-import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -30,49 +30,93 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * An object which can be used to listen to the number of active consumers
- * available on a given destination.
- * 
+ * A helper class which keeps track of the Destinations available in a broker and allows
you to listen to them
+ * being created or deleted.
+ *
  * @version $Revision$
  */
-public class ConsumerEventSource implements Service, MessageListener {
+public class DestinationSource implements MessageListener {
     private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class);
-
-    private final Connection connection;
-    private final ActiveMQDestination destination;
-    private ConsumerListener listener;
     private AtomicBoolean started = new AtomicBoolean(false);
-    private AtomicInteger consumerCount = new AtomicInteger();
+    private final Connection connection;
     private Session session;
-    private MessageConsumer consumer;
+    private MessageConsumer queueConsumer;
+    private MessageConsumer topicConsumer;
+    private MessageConsumer tempTopicConsumer;
+    private MessageConsumer tempQueueConsumer;
+    private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
+    private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
+    private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
+    private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
+    private DestinationListener listener;
 
-    public ConsumerEventSource(Connection connection, Destination destination) throws JMSException
{
+    public DestinationSource(Connection connection) throws JMSException {
         this.connection = connection;
-        this.destination = ActiveMQDestination.transform(destination);
     }
 
-    public void setConsumerListener(ConsumerListener listener) {
+    public DestinationListener getListener() {
+        return listener;
+    }
+
+    public void setConsumerListener(DestinationListener listener) {
         this.listener = listener;
     }
 
-    public void start() throws Exception {
+    /**
+     * Returns the current queues available on the broker
+     */
+    public Set<ActiveMQQueue> getQueues() {
+        return queues;
+    }
+
+    /**
+     * Returns the current topics on the broker
+     */
+    public Set<ActiveMQTopic> getTopics() {
+        return topics;
+    }
+
+    /**
+     * Returns the current temporary topics available on the broker
+     */
+    public Set<ActiveMQTempQueue> getTemporaryQueues() {
+        return temporaryQueues;
+    }
+
+    /**
+     * Returns the current temporary queues available on the broker
+     */
+    public Set<ActiveMQTempTopic> getTemporaryTopics() {
+        return temporaryTopics;
+    }
+
+    public void start() throws JMSException {
         if (started.compareAndSet(false, true)) {
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
-            consumer = session.createConsumer(advisoryTopic);
-            consumer.setMessageListener(this);
+            queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
+            queueConsumer.setMessageListener(this);
+
+            topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
+            topicConsumer.setMessageListener(this);
+
+            tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
+            tempQueueConsumer.setMessageListener(this);
+
+            tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
+            tempTopicConsumer.setMessageListener(this);
         }
     }
 
-    public void stop() throws Exception {
+    public void stop() throws JMSException {
         if (started.compareAndSet(true, false)) {
             if (session != null) {
                 session.close();
@@ -82,51 +126,68 @@
 
     public void onMessage(Message message) {
         if (message instanceof ActiveMQMessage) {
-            ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
             Object command = activeMessage.getDataStructure();
-            int count = 0;
-            if (command instanceof ConsumerInfo) {
-                count = consumerCount.incrementAndGet();
-                count = extractConsumerCountFromMessage(message, count);
-                fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)command,
count));
-            } else if (command instanceof RemoveInfo) {
-                RemoveInfo removeInfo = (RemoveInfo)command;
-                if (removeInfo.isConsumerRemove()) {
-                    count = consumerCount.decrementAndGet();
-                    count = extractConsumerCountFromMessage(message, count);
-                    fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(),
count));
-                }
-            } else {
-                LOG.warn("Unknown command: " + command);
+            if (command instanceof DestinationInfo) {
+                DestinationInfo destinationInfo = (DestinationInfo) command;
+                DestinationEvent event = new DestinationEvent(this, destinationInfo);
+                fireDestinationEvent(event);
+            }
+            else {
+                LOG.warn("Unknown dataStructure: " + command);
             }
-        } else {
+        }
+        else {
             LOG.warn("Unknown message type: " + message + ". Message ignored");
         }
     }
 
-    /**
-     * Lets rely by default on the broker telling us what the consumer count is
-     * as it can ensure that we are up to date at all times and have not
-     * received messages out of order etc.
-     */
-    protected int extractConsumerCountFromMessage(Message message, int count) {
-        try {
-            Object value = message.getObjectProperty("consumerCount");
-            if (value instanceof Number) {
-                Number n = (Number)value;
-                return n.intValue();
-            }
-            LOG.warn("No consumerCount header available on the message: " + message);
-        } catch (Exception e) {
-            LOG.warn("Failed to extract consumerCount from message: " + message + ".Reason:
" + e, e);
+    protected void fireDestinationEvent(DestinationEvent event) {
+        if (listener != null) {
+            listener.onDestinationEvent(event);
         }
-        return count;
-    }
 
-    protected void fireConsumerEvent(ConsumerEvent event) {
-        if (listener != null) {
-            listener.onConsumerEvent(event);
+        // now lets update the data structures
+        ActiveMQDestination destination = event.getDestination();
+        boolean add = event.isAddOperation();
+        if (destination instanceof ActiveMQQueue) {
+            ActiveMQQueue queue = (ActiveMQQueue) destination;
+            if (add) {
+                queues.add(queue);
+            }
+            else {
+                queues.remove(queue);
+            }
+        }
+        else if (destination instanceof ActiveMQTopic) {
+            ActiveMQTopic topic = (ActiveMQTopic) destination;
+            if (add) {
+                topics.add(topic);
+            }
+            else {
+                topics.remove(topic);
+            }
+        }
+        else if (destination instanceof ActiveMQTempQueue) {
+            ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
+            if (add) {
+                temporaryQueues.add(queue);
+            }
+            else {
+                temporaryQueues.remove(queue);
+            }
+        }
+        else if (destination instanceof ActiveMQTempTopic) {
+            ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
+            if (add) {
+                temporaryTopics.add(topic);
+            }
+            else {
+                temporaryTopics.remove(topic);
+            }
+        }
+        else {
+            LOG.warn("Unknown destination type: " + destination);
         }
     }
-
-}
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java
(from r634256, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java&r1=634256&r2=634277&rev=634277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java
Thu Mar  6 06:35:26 2008
@@ -16,124 +16,62 @@
  */
 package org.apache.activemq.advisory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.ReconnectWithSameClientIDTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * 
  * @version $Revision$
  */
-public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements ConsumerListener
{
-    private static final Log LOG = LogFactory.getLog(ConsumerListenerTest.class);
-
-    protected Session consumerSession1;
-    protected Session consumerSession2;
-    protected int consumerCounter;
-    protected ConsumerEventSource consumerEventSource;
-    protected BlockingQueue<ConsumerEvent> eventQueue = new ArrayBlockingQueue<ConsumerEvent>(1000);
-    private Connection connection;
-
-    public void testConsumerEvents() throws Exception {
-        consumerEventSource.start();
-
-        consumerSession1 = createConsumer();
-        assertConsumerEvent(1, true);
-
-        consumerSession2 = createConsumer();
-        assertConsumerEvent(2, true);
-
-        consumerSession1.close();
-        consumerSession1 = null;
-        assertConsumerEvent(1, false);
-
-        consumerSession2.close();
-        consumerSession2 = null;
-        assertConsumerEvent(0, false);
-    }
-
-    public void testListenWhileAlreadyConsumersActive() throws Exception {
-        consumerSession1 = createConsumer();
-        consumerSession2 = createConsumer();
-
-        consumerEventSource.start();
-        assertConsumerEvent(2, true);
-        assertConsumerEvent(2, true);
-
-        consumerSession1.close();
-        consumerSession1 = null;
-        assertConsumerEvent(1, false);
-
-        consumerSession2.close();
-        consumerSession2 = null;
-        assertConsumerEvent(0, false);
+public class DestinationListenerTest extends EmbeddedBrokerTestSupport implements DestinationListener
{
+    private static final Log LOG = LogFactory.getLog(DestinationListenerTest.class);
+    protected ActiveMQConnection connection;
+    protected DestinationSource destinationSource;
+
+    public void testDestiationSource() throws Exception {
+        Thread.sleep(1000);
+        System.out.println("Queues: " + destinationSource.getQueues());
+        System.out.println("Topics: " + destinationSource.getTopics());
     }
 
-    public void onConsumerEvent(ConsumerEvent event) {
-        eventQueue.add(event);
+    public void onDestinationEvent(DestinationEvent event) {
+        ActiveMQDestination destination = event.getDestination();
+        if (event.isAddOperation()) {
+            System.out.println("Added:   " + destination);
+        }
+        else {
+            System.out.println("Removed: " + destination);
+        }
     }
 
     protected void setUp() throws Exception {
         super.setUp();
 
-        connection = createConnection();
+        connection = (ActiveMQConnection) createConnection();
         connection.start();
-        consumerEventSource = new ConsumerEventSource(connection, destination);
-        consumerEventSource.setConsumerListener(this);
+
+        destinationSource = connection.getDestinationSource();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.setDestinations(new ActiveMQDestination[]{
+                new ActiveMQQueue("foo.bar"),
+                new ActiveMQTopic("cheese")
+        });
+        return broker;
     }
 
     protected void tearDown() throws Exception {
-        if (consumerEventSource != null) {
-            consumerEventSource.stop();
-        }
-        if (consumerSession2 != null) {
-            consumerSession2.close();
-        }
-        if (consumerSession1 != null) {
-            consumerSession1.close();
-        }
         if (connection != null) {
             connection.close();
         }
         super.tearDown();
     }
-
-    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException
{
-        ConsumerEvent event = waitForConsumerEvent();
-        assertEquals("Consumer count", count, event.getConsumerCount());
-        assertEquals("started", started, event.isStarted());
-    }
-
-    protected Session createConsumer() throws JMSException {
-        final String consumerText = "Consumer: " + (++consumerCounter);
-        LOG.info("Creating consumer: " + consumerText + " on destination: " + destination);
-
-        Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = answer.createConsumer(destination);
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message message) {
-                LOG.info("Received message by: " + consumerText + " message: " + message);
-            }
-        });
-        return answer;
-    }
-
-    protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
-        ConsumerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS);
-        assertTrue("Should have received a consumer event!", answer != null);
-        return answer;
-    }
-
-}
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java
(from r634256, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java&r1=634256&r2=634277&rev=634277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java
Thu Mar  6 06:35:26 2008
@@ -19,46 +19,53 @@
 import java.util.List;
 
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.AssertionClause;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.jms.JmsMessage;
 
 /**
  * @version $Revision$
  */
-public class JournalRouteTest extends ContextTestSupport {
+public class AdvisoryConsumerExample extends ContextTestSupport {
 
-    public void testSimpleJournalRoute() throws Exception {
+    public void testWorks() throws Exception {
+        // lets create a new queue
+        template.sendBody("activemq:NewQueue." + System.currentTimeMillis(), "<hello>world!</hello>");
 
-        byte[] payload = "Hello World".getBytes();
-        
-        
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:out", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-        
-        AssertionClause firstMessageExpectations = resultEndpoint.message(0);
-        firstMessageExpectations.header("journal").isEqualTo("activemq.journal:target/test.a");
-        firstMessageExpectations.header("location").isNotNull();
-        firstMessageExpectations.body().isInstanceOf(ByteSequence.class);
-
-        template.sendBody("direct:in", payload);
-
-        resultEndpoint.assertIsSatisfied();
-
-        List<Exchange> list = resultEndpoint.getReceivedExchanges();
-        Exchange exchange = list.get(0);
-        ByteSequence body = (ByteSequence)exchange.getIn().getBody();
-        body.compact(); // trims the byte array to the actual size.
-        assertEquals("body", new String(payload), new String(body.data));
+        Thread.sleep(10000);
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("direct:in").to("activemq.journal:target/test.a");
-                from("activemq.journal:target/test.a").to("mock:out");
+                // lets force the creation of a queue up front
+                from("activemq:InitialQueue").to("log:Messages");
+
+                from("activemq:topic:ActiveMQ.Advisory.Queue?cacheLevelName=CACHE_CONSUMER").process(new
Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        Message in = exchange.getIn();
+                        if (in instanceof JmsMessage) {
+                            JmsMessage jmsMessage = (JmsMessage) in;
+                            javax.jms.Message value = jmsMessage.getJmsMessage();
+                            if (value instanceof ActiveMQMessage) {
+                                ActiveMQMessage activeMQMessage = (ActiveMQMessage) value;
+                                DataStructure structure = activeMQMessage.getDataStructure();
+                                if (structure instanceof DestinationInfo) {
+                                    DestinationInfo destinationInfo = (DestinationInfo) structure;
+                                    System.out.println("Received: " + destinationInfo);
+                                }
+                            }
+                        }
+                    }
+                });
             }
         };
     }



Mime
View raw message