activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r634686 - in /activemq/trunk: ./ activemq-core/src/main/java/org/apache/activemq/camel/component/ activemq-core/src/test/java/org/apache/activemq/camel/component/
Date Fri, 07 Mar 2008 14:17:02 GMT
Author: jstrachan
Date: Fri Mar  7 06:17:01 2008
New Revision: 634686

URL: http://svn.apache.org/viewvc?rev=634686&view=rev
Log:
added support to be able to auto-expose ActiveMQ Queues into a CamelContext so that they are
browsable by any Camel based tooling

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java
  (contents, props changed)
      - copied, changed from r634630, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java?rev=634686&r1=634685&r2=634686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
Fri Mar  7 06:17:01 2008
@@ -26,6 +26,9 @@
  * @version $Revision$
  */
 public class ActiveMQComponent extends JmsComponent {
+    private boolean exposeAllQueues;
+    private CamelEndpointLoader endpointLoader;
+
     /**
      * Creates an <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ
Component</a>
      *
@@ -68,6 +71,38 @@
         getConfiguration().setBrokerURL(brokerURL);
     }
 
+    public boolean isExposeAllQueues() {
+        return exposeAllQueues;
+    }
+
+    /**
+     * If enabled this will cause all Queues in the ActiveMQ broker to be eagerly populated
into the CamelContext
+     * so that they can be easily browsed by any Camel tooling. This option is disabled by
default.
+     *
+     * @param exposeAllQueues
+     */
+    public void setExposeAllQueues(boolean exposeAllQueues) {
+        this.exposeAllQueues = exposeAllQueues;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (isExposeAllQueues()) {
+            endpointLoader = new CamelEndpointLoader(getCamelContext());
+            endpointLoader.afterPropertiesSet();
+        }
+    }
+
+
+    @Override
+    protected void doStop() throws Exception {
+        if (endpointLoader != null) {
+            endpointLoader.destroy();
+            endpointLoader = null;
+        }
+        super.doStop();
+    }
 
     @Override
     protected JmsConfiguration createConfiguration() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java?rev=634686&r1=634685&r2=634686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java
Fri Mar  7 06:17:01 2008
@@ -31,18 +31,19 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Endpoint;
-import org.apache.camel.component.jms.JmsEndpoint;
+import org.apache.camel.component.jms.JmsQueueEndpoint;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 
 /**
  * A helper bean which populates a {@link CamelContext} with ActiveMQ Queue endpoints
- * 
+ *
  * @version $Revision: 1.1 $
  */
-public class CamelEndpointLoader implements InitializingBean, CamelContextAware {
+public class CamelEndpointLoader implements InitializingBean, DisposableBean, CamelContextAware
{
     private static final transient Log LOG = LogFactory.getLog(CamelEndpointLoader.class);
     private CamelContext camelContext;
     private ActiveMQConnection connection;
@@ -56,40 +57,6 @@
         this.camelContext = camelContext;
     }
 
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    public ActiveMQConnection getConnection() {
-        return connection;
-    }
-
-    public ConnectionFactory getConnectionFactory() {
-        if (connectionFactory == null) {
-            connectionFactory = getComponent().getConfiguration().createConnectionFactory();
-        }
-        return connectionFactory;
-    }
-
-    public void setConnectionFactory(ConnectionFactory connectionFactory) {
-        this.connectionFactory = connectionFactory;
-    }
-
-    public ActiveMQComponent getComponent() {
-        if (component == null) {
-            component = camelContext.getComponent("activemq", ActiveMQComponent.class);
-        }
-        return component;
-    }
-
-    public void setComponent(ActiveMQComponent component) {
-        this.component = component;
-    }
-
     public void afterPropertiesSet() throws Exception {
         ObjectHelper.notNull(camelContext, "camelContext");
         if (connection == null) {
@@ -101,6 +68,7 @@
                 throw new IllegalArgumentException("Created JMS Connection is not an ActiveMQConnection:
" + value);
             }
         }
+        connection.start();
         DestinationSource source = connection.getDestinationSource();
         source.setDestinationListener(new DestinationListener() {
             public void onDestinationEvent(DestinationEvent event) {
@@ -128,10 +96,56 @@
         }
     }
 
+    public void destroy() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public ActiveMQConnection getConnection() {
+        return connection;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        if (connectionFactory == null) {
+            connectionFactory = getComponent().getConfiguration().createConnectionFactory();
+        }
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public ActiveMQComponent getComponent() {
+        if (component == null) {
+            component = camelContext.getComponent("activemq", ActiveMQComponent.class);
+        }
+        return component;
+    }
+
+    public void setComponent(ActiveMQComponent component) {
+        this.component = component;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
     protected void addQueue(ActiveMQQueue queue) throws Exception {
         String queueUri = getQueueUri(queue);
         ActiveMQComponent jmsComponent = getComponent();
-        Endpoint endpoint = new JmsEndpoint(queueUri, jmsComponent, queue.getPhysicalName(),
false, jmsComponent.getConfiguration());
+        Endpoint endpoint = new JmsQueueEndpoint(queueUri, jmsComponent, queue.getPhysicalName(),
jmsComponent.getConfiguration());
         camelContext.addSingletonEndpoint(queueUri, endpoint);
     }
 

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java
(from r634630, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java&r1=634630&r2=634686&rev=634686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AdvisoryConsumerExample.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java
Fri Mar  7 06:17:01 2008
@@ -18,55 +18,72 @@
 
 import java.util.List;
 
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.Message;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.AssertionClause;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.jms.JmsMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelTemplate;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
+ * Shows that we can see the queues inside ActiveMQ via Camel
+ * by enabling the {@link ActiveMQComponent#setExposeAllQueues(boolean)} flag
+ *
  * @version $Revision$
  */
-public class AdvisoryConsumerExample extends ContextTestSupport {
+public class AutoExposeQueuesInCamelTest extends EmbeddedBrokerTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(AutoExposeQueuesInCamelTest.class);
+
+    protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar");
+    protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese");
+
+    protected CamelContext camelContext = new DefaultCamelContext();
+    protected CamelTemplate template;
 
     public void testWorks() throws Exception {
-        // lets create a new queue
-        template.sendBody("activemq:NewQueue." + System.currentTimeMillis(), "<hello>world!</hello>");
+        Thread.sleep(2000);
+        LOG.debug("Looking for endpoints...");
+        List<BrowsableEndpoint> endpoints = CamelContextHelper.getSingletonEndpoints(camelContext,
BrowsableEndpoint.class);
+        for (BrowsableEndpoint endpoint : endpoints) {
+            LOG.debug("Endpoint: " + endpoint);
+        }
+        assertEquals("Should have found an endpoint: "+ endpoints, 1, endpoints.size());
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        // lets configure the ActiveMQ component for Camel
+        ActiveMQComponent component = new ActiveMQComponent();
+        component.setBrokerURL(bindAddress);
+        component.setExposeAllQueues(true);
 
-        Thread.sleep(10000);
+        camelContext.addComponent("activemq", component);
+        camelContext.start();
     }
 
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() throws Exception {
-                // lets force the creation of a queue up front
-                from("activemq:InitialQueue").to("log:Messages");
-
-                from("activemq:topic:ActiveMQ.Advisory.Queue?cacheLevelName=CACHE_CONSUMER").process(new
Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        Message in = exchange.getIn();
-                        if (in instanceof JmsMessage) {
-                            JmsMessage jmsMessage = (JmsMessage) in;
-                            javax.jms.Message value = jmsMessage.getJmsMessage();
-                            if (value instanceof ActiveMQMessage) {
-                                ActiveMQMessage activeMQMessage = (ActiveMQMessage) value;
-                                DataStructure structure = activeMQMessage.getDataStructure();
-                                if (structure instanceof DestinationInfo) {
-                                    DestinationInfo destinationInfo = (DestinationInfo) structure;
-                                    System.out.println("Received: " + destinationInfo);
-                                }
-                            }
-                        }
-                    }
-                });
-            }
-        };
+    @Override
+    protected void tearDown() throws Exception {
+        camelContext.stop();
+        super.tearDown();
     }
+
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.setDestinations(new ActiveMQDestination[]{
+                sampleQueue,
+                sampleTopic
+        });
+        return broker;
+    }
+
 }

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=634686&r1=634685&r2=634686&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Fri Mar  7 06:17:01 2008
@@ -37,7 +37,11 @@
     <aopalliance-version>1.0</aopalliance-version>
     <axion-version>1.0-M3-dev</axion-version>
     <axis-version>1.2-RC1</axis-version>
-    <camel-version>1.2.0</camel-version>
+    <!-- TODO switch to 1.3.0 when its released
+      needed for changes in the org.apache.activemq.camel.component package to automatically
make
+      ActiveMQ queues available in Camel as endpoints and make them browsable
+      -->
+    <camel-version>1.3-SNAPSHOT</camel-version>
     <cglib-version>2.0</cglib-version>
     <commons-beanutils-version>1.6.1</commons-beanutils-version>
     <commons-collections-version>3.1</commons-collections-version>



Mime
View raw message