activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r522838 [2/2] - in /activemq/camel/trunk: camel-core/ camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/component/pojo/ camel-core/src/main/java/org/apach...
Date Tue, 27 Mar 2007 09:30:57 GMT
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Mar 27 02:30:52 2007
@@ -20,8 +20,10 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
 import static org.apache.camel.util.ObjectHelper.iterator;
 import static org.apache.camel.util.ObjectHelper.notNull;
+import org.apache.camel.util.ServiceHelper;
 
 import java.util.Iterator;
 
@@ -31,12 +33,12 @@
  *
  * @version $Revision$
  */
-public class Splitter<E extends Exchange> implements Processor<E> {
-    private final Processor<E> destination;
+public class Splitter<E extends Exchange> extends ServiceSupport implements Processor<E> {
+    private final Processor<E> processor;
     private final Expression<E> expression;
 
     public Splitter(Processor<E> destination, Expression<E> expression) {
-        this.destination = destination;
+        this.processor = destination;
         this.expression = expression;
         notNull(destination, "destination");
         notNull(expression, "expression");
@@ -44,7 +46,7 @@
 
     @Override
     public String toString() {
-        return "Splitter[on: " + expression + " to: " + destination + "]";
+        return "Splitter[on: " + expression + " to: " + processor + "]";
     }
 
     public void onExchange(E exchange) {
@@ -54,7 +56,15 @@
             Object part = iter.next();
             E newExchange = (E) exchange.copy();
             newExchange.getIn().setBody(part);
-            destination.onExchange(newExchange);
+            processor.onExchange(newExchange);
         }
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(processor);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(processor);
     }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.HashMap;
+
+/**
+ * @version $Revision$
+ */
+public class ProducerCache<E extends Exchange> extends ServiceSupport {
+
+    private Map<String, Producer<E>> producers = new HashMap<String, Producer<E>>();
+
+    public synchronized Producer<E> getProducer(Endpoint<E> endpoint) {
+        String key = endpoint.getEndpointUri();
+        Producer<E> answer = producers.get(key);
+        if (answer == null) {
+            try {
+                answer = endpoint.createProducer();
+            }
+            catch (Exception e) {
+                throw new FailedToCreateProducerException(endpoint, e);
+            }
+            producers.put(key, answer);
+            // TODO auto-start?
+        }
+        return answer;
+    }
+
+    /**
+     * Sends the exchange to the given endpoint
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param exchange the exchange to send
+     */
+    public void send(Endpoint<E> endpoint, E exchange) {
+        Producer<E> producer = getProducer(endpoint);
+        producer.onExchange(exchange);
+    }
+
+    /**
+     * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param processor the transformer used to populate the new exchange
+     */
+    public void send(Endpoint<E> endpoint, Processor<E> processor) {
+        Producer<E> producer = getProducer(endpoint);
+        E exchange = producer.createExchange();
+
+        // lets populate using the processor callback
+        processor.onExchange(exchange);
+
+        // now lets dispatch
+        producer.onExchange(exchange);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(producers.values());
+    }
+
+    protected void doStart() throws Exception {
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Collection;
+
+/**
+ * A collection of helper methods for working with {@link Service} objects
+ *
+ * @version $Revision$
+ */
+public class ServiceHelper {
+    private static final transient Log log = LogFactory.getLog(ServiceHelper.class);
+
+
+    /**
+     * Starts all of the given services
+     */
+    public static void startServices(Object... services) throws Exception {
+        for (Object value : services) {
+            if (value instanceof Service) {
+                Service service = (Service) value;
+                service.start();
+            }
+        }
+    }
+
+    /**
+     * Starts all of the given services
+     */
+    public static void startServices(Collection services) throws Exception {
+        for (Object value : services) {
+            if (value instanceof Service) {
+                Service service = (Service) value;
+                service.start();
+            }
+        }
+    }
+
+
+    /**
+     * Stops all of the given services, throwing the first exception caught
+     */
+    public static void stopServices(Object... services) throws Exception {
+        Exception firstException = null;
+        for (Object value : services) {
+            if (value instanceof Service) {
+                Service service = (Service) value;
+                try {
+                    service.stop();
+                }
+                catch (Exception e) {
+                    log.debug("Caught exception shutting down: " + e, e);
+                    if (firstException == null) {
+                        firstException = e;
+                    }
+                }
+            }
+        }
+        if (firstException != null) {
+            throw firstException;
+        }
+    }
+    /**
+     * Stops all of the given services, throwing the first exception caught
+     */
+    public static void stopServices(Collection services) throws Exception {
+        Exception firstException = null;
+        for (Object value : services) {
+            if (value instanceof Service) {
+                Service service = (Service) value;
+                try {
+                    service.stop();
+                }
+                catch (Exception e) {
+                    log.debug("Caught exception shutting down: " + e, e);
+                    if (firstException == null) {
+                        firstException = e;
+                    }
+                }
+            }
+        }
+        if (firstException != null) {
+            throw firstException;
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java Tue Mar 27 02:30:52 2007
@@ -49,7 +49,7 @@
         	}
         };
         // lets add some routes
-        container.setRoutes(new RouteBuilder() {
+        container.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("pojo:default:hello").intercept(tracingInterceptor).target().to("pojo:default:bye");
                 
@@ -59,6 +59,8 @@
 
         
         container.activateEndpoints();
+
+        /* TODO
         
         // now lets fire in a message
         PojoEndpoint endpoint = (PojoEndpoint) container.resolveEndpoint("pojo:default:hello");
@@ -76,6 +78,7 @@
         } catch (IllegalStateException expected) {
 			// since bye is not active.
 		}
+		*/
 
         container.deactivateEndpoints();
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 
@@ -34,13 +35,13 @@
 public class QueueRouteTest extends TestCase {
 
 	
-    public void testJmsRoute() throws Exception {
+    public void testSedaQueue() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
 
         CamelContext container = new DefaultCamelContext();
 
         // lets add some routes
-        container.setRoutes(new RouteBuilder() {
+        container.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("queue:test.a").to("queue:test.b");
                 from("queue:test.b").process(new Processor<Exchange>() {
@@ -59,7 +60,9 @@
         Endpoint<Exchange> endpoint = container.resolveEndpoint("queue:test.a");
         Exchange exchange = endpoint.createExchange();
         exchange.getIn().setHeader("cheese", 123);
-        endpoint.onExchange(exchange);
+
+        Producer<Exchange> producer = endpoint.createProducer();
+        producer.onExchange(exchange);
 
         // now lets sleep for a while
         boolean received = latch.await(5, TimeUnit.SECONDS);

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java Tue Mar 27 02:30:52 2007
@@ -19,9 +19,17 @@
 
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.CamelContext;
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.endpoint.ServerRegistry;
+import org.apache.cxf.bus.CXFBusFactory;
 
 import java.util.Map;
 import java.util.HashMap;
+import java.util.List;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URI;
@@ -44,7 +52,7 @@
     public synchronized CxfEndpoint createEndpoint(String uri, String[] urlParts) throws IOException, URISyntaxException {
         CxfEndpoint endpoint = map.get(uri);
         if (endpoint == null) {
-            String remainingUrl = uri.substring("mina:".length());
+            String remainingUrl = uri.substring("cxf:".length());
             URI u = new URI(remainingUrl);
 
             String protocol = u.getScheme();
@@ -54,4 +62,21 @@
         return endpoint;
     }
 
+    /*
+    protected void foo() {
+       Bus bus = CXFBusFactory.getDefaultBus();
+       ServerRegistry serverRegistry = bus.getExtension(ServerRegistry.class);
+       List<Server> servers = serverRegistry.getServers();
+
+       Server targetServer = null;
+       for (Server server : servers) {
+           targetServer = server;
+           EndpointInfo info = server.getEndpoint().getEndpointInfo();
+           String address = info.getAddress();
+
+           Message message = new MessageImpl();
+           server.getMessageObserver().onMessage(message);
+       }
+    }
+    */
 }

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Tue Mar 27 02:30:52 2007
@@ -19,7 +19,11 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultConsumer;
 
 /**
  * The endpoint in the service engine
@@ -33,8 +37,16 @@
         super(uri, camelContext);
     }
 
-    public void onExchange(CxfExchange cxfExchange) {
-        // TODO send into CXF
+    public Producer<CxfExchange> createProducer() throws Exception {
+        return startService(new DefaultProducer<CxfExchange>(this) {
+            public void onExchange(CxfExchange exchange) {
+                // TODO send into CXF
+            }
+        });
+    }
+
+    public Consumer<CxfExchange> createConsumer(Processor<CxfExchange> processor) throws Exception {
+        return startService(new DefaultConsumer<CxfExchange>(this, processor) {});
     }
 
     public CxfExchange createExchange() {

Modified: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java (original)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java Tue Mar 27 02:30:52 2007
@@ -17,6 +17,9 @@
  */
 package org.apache.camel.component.http;
 
+import org.apache.camel.Producer;
+import org.apache.camel.util.ProducerCache;
+
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -28,6 +31,7 @@
  */
 public class CamelServlet extends HttpServlet {
     private HttpEndpoint endpoint;
+    private ProducerCache<HttpExchange> producerCache = new ProducerCache<HttpExchange>();
 
     public CamelServlet() {
     }
@@ -44,7 +48,7 @@
         }
 
         HttpExchange exchange = endpoint.createExchange(request, response);
-        endpoint.onExchange(exchange);
+        producerCache.send(endpoint, exchange);
 
         // HC: The getBinding() interesting because it illustrates the impedance miss-match between
         // HTTP's stream oriented protocol, and Camels more message oriented protocol exchanges.

Modified: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java (original)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java Tue Mar 27 02:30:52 2007
@@ -18,8 +18,12 @@
 package org.apache.camel.component.http;
 
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -37,16 +41,17 @@
         super(uri, camelContext);
     }
 
-    public void onExchange(HttpExchange exchange) {
-        Processor<HttpExchange> processor = getInboundProcessor();
-        if (processor != null) {
-            // lets route straight to our processor
-            processor.onExchange(exchange);
-        }
-        else {
-            // we need an external HTTP client such as commons-httpclient
-            // TODO
-        }
+    public Producer<HttpExchange> createProducer() throws Exception {
+        return startService(new DefaultProducer<HttpExchange>(this) {
+            public void onExchange(HttpExchange exchange) {
+                /** TODO */
+            }
+        });
+    }
+
+    public Consumer<HttpExchange> createConsumer(Processor<HttpExchange> processor) throws Exception {
+        // TODO
+        return startService(new DefaultConsumer<HttpExchange>(this, processor) {});
     }
 
     public HttpExchange createExchange() {

Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java Tue Mar 27 02:30:52 2007
@@ -16,6 +16,9 @@
 import org.apache.camel.Component;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointResolver;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
 import org.apache.servicemix.common.DefaultComponent;
 import org.apache.servicemix.common.ServiceUnit;
 import org.apache.servicemix.jbi.util.IntrospectionSupport;
@@ -91,16 +94,19 @@
         return endpoint;
     }
 
-    /**
-     * A factory method for creating endpoints from a service endpoint
-     * which is public so that it can be easily unit tested
-     */
     public CamelJbiEndpoint createEndpoint(ServiceEndpoint ep) throws URISyntaxException {
         URI uri = new URI(ep.getEndpointName());
         Map map = URISupport.parseQuery(uri.getQuery());
         String camelUri = uri.getSchemeSpecificPart();
         Endpoint camelEndpoint = getCamelContext().resolveEndpoint(camelUri);
-        CamelJbiEndpoint endpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding());
+        Processor<Exchange> processor = null;
+        try {
+            processor = camelEndpoint.createProducer();
+        }
+        catch (Exception e) {
+            throw new FailedToCreateProducerException(camelEndpoint, e);
+        }
+        CamelJbiEndpoint endpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
 
         IntrospectionSupport.setProperties(endpoint, map);
 
@@ -136,8 +142,8 @@
     /**
      * Returns a JBI endpoint created for the given Camel endpoint
      */
-    public CamelJbiEndpoint activateJbiEndpoint(JbiEndpoint camelEndpoint) throws Exception {
-        CamelJbiEndpoint jbiEndpoint = null;
+    public CamelJbiEndpoint activateJbiEndpoint(JbiEndpoint camelEndpoint, Processor<Exchange> processor) throws Exception {
+        CamelJbiEndpoint jbiEndpoint;
         String endpointUri = camelEndpoint.getEndpointUri();
         if (endpointUri.startsWith("endpoint:")) {
             // lets decode "service:serviceNamespace:serviceName:endpointName
@@ -151,10 +157,10 @@
             }
             QName service = new QName(parts[0], parts[1]);
             String endpoint = parts[2];
-            jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), service, endpoint, camelEndpoint, getBinding());
+            jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), service, endpoint, camelEndpoint, getBinding(), processor);
         }
         else {
-            jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding());
+            jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
         }
 
         // the following method will activate the new dynamic JBI endpoint

Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java Tue Mar 27 02:30:52 2007
@@ -13,6 +13,9 @@
 package org.apache.camel.component.jbi;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ProducerCache;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.common.ServiceUnit;
@@ -32,24 +35,25 @@
     private static final QName SERVICE_NAME = new QName("http://camel.apache.org/service", "CamelEndpointComponent");
     private Endpoint camelEndpoint;
     private JbiBinding binding;
+    private Processor<Exchange> processor;
 
-    public CamelJbiEndpoint(ServiceUnit serviceUnit, QName service, String endpoint, Endpoint camelEndpoint, JbiBinding binding) {
+    public CamelJbiEndpoint(ServiceUnit serviceUnit, QName service, String endpoint, Endpoint camelEndpoint, JbiBinding binding, Processor<Exchange> processor) {
         super(serviceUnit, service, endpoint);
+        this.processor = processor;
         this.camelEndpoint = camelEndpoint;
         this.binding = binding;
     }
 
-    public CamelJbiEndpoint(ServiceUnit serviceUnit, Endpoint camelEndpoint, JbiBinding binding) {
-        this(serviceUnit, SERVICE_NAME, camelEndpoint.getEndpointUri(), camelEndpoint, binding);
+    public CamelJbiEndpoint(ServiceUnit serviceUnit, Endpoint camelEndpoint, JbiBinding binding, Processor<Exchange> processor) {
+        this(serviceUnit, SERVICE_NAME, camelEndpoint.getEndpointUri(), camelEndpoint, binding, processor);
     }
 
     protected void processInOnly(MessageExchange exchange, NormalizedMessage in) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("Received exchange: " + exchange);
         }
-        // lets use the inbound processor to handle the exchange
         JbiExchange camelExchange = new JbiExchange(camelEndpoint.getContext(), binding, exchange);
-        camelEndpoint.onExchange(camelExchange);
+        processor.onExchange(camelExchange);
     }
 
     protected void processInOut(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws Exception {

Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java Tue Mar 27 02:30:52 2007
@@ -20,7 +20,11 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultConsumer;
 
 /**
  * Represents an {@link Endpoint} for interacting with JBI
@@ -37,22 +41,50 @@
         toJbiProcessor = new ToJbiProcessor(jbiComponent.getBinding(), jbiComponent.getComponentContext(), uri);
     }
 
-    /**
-     * Sends a message into JBI
-     */
+    public Producer<Exchange> createProducer() throws Exception {
+        return new DefaultProducer<Exchange>(this) {
+            public void onExchange(Exchange exchange) {
+                toJbiProcessor.onExchange(exchange);
+            }
+        };
+    }
+
+    public Consumer<Exchange> createConsumer(final Processor<Exchange> processor) throws Exception {
+        return new DefaultConsumer<Exchange>(this, processor) {
+            CamelJbiEndpoint jbiEndpoint;
+
+            @Override
+            protected void doStart() throws Exception {
+                super.doStart();
+                jbiEndpoint = jbiComponent.activateJbiEndpoint(JbiEndpoint.this, processor);
+            }
+
+            @Override
+            protected void doStop() throws Exception {
+/*
+                if (jbiEndpoint != null) {
+                    jbiEndpoint.deactivate();
+                }
+*/
+                super.doStop();
+            }
+        };
+    }
+
+    /*
     public void onExchange(Exchange exchange) {
         if (getInboundProcessor() != null) {
             getInboundProcessor().onExchange(exchange);
         } else {
             toJbiProcessor.onExchange(exchange);        }
     }
+    */
 
     @Override
     protected void doActivate() throws Exception {
         super.doActivate();
 
         // lets create and activate the endpoint in JBI
-        jbiComponent.activateJbiEndpoint(this);
     }
 
     public JbiExchange createExchange() {

Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java Tue Mar 27 02:30:52 2007
@@ -48,7 +48,11 @@
             DeliveryChannel deliveryChannel = componentContext.getDeliveryChannel();
             MessageExchangeFactory exchangeFactory = deliveryChannel.createExchangeFactory();
             MessageExchange messageExchange = binding.makeJbiMessageExchange(exchange, exchangeFactory);
+            System.out.println("#### Configuring exchange with: " + destinationUri);
             URIResolver.configureExchange(messageExchange, componentContext, destinationUri);
+
+            System.out.println("#### service: " + messageExchange.getService() + " endpoint: " + messageExchange.getEndpoint());
+            
             deliveryChannel.sendSync(messageExchange);
         }
         catch (MessagingException e) {

Modified: activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java (original)
+++ activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,8 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.TestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ProducerCache;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.servicemix.jbi.container.ActivationSpec;
@@ -43,17 +45,19 @@
     protected CountDownLatch latch = new CountDownLatch(1);
     protected Endpoint<Exchange> endpoint;
     protected String startEndpointUri = "jbi:endpoint:serviceNamespace:serviceA:endpointA";
+    protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
 
     /**
      * Sends an exchange to the endpoint
      */
-    protected void sendExchange(Object expectedBody) {
-        // now lets fire in a message
-        Exchange exchange = endpoint.createExchange();
-        Message in = exchange.getIn();
-        in.setBody(expectedBody);
-        in.setHeader("cheese", 123);
-        endpoint.onExchange(exchange);
+    protected void sendExchange(final Object expectedBody) {
+        client.send(endpoint, new Processor<Exchange>() {
+            public void onExchange(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody(expectedBody);
+                in.setHeader("cheese", 123);
+            }
+        });
     }
 
     protected Object assertReceivedValidExchange(Class type) throws Exception {
@@ -96,11 +100,18 @@
         camelContext.addComponent("jbi", component);
 
         // lets add some routes
-        camelContext.setRoutes(createRoutes());
+        camelContext.addRoutes(createRoutes());
         endpoint = camelContext.resolveEndpoint(startEndpointUri);
         assertNotNull("No endpoint found!", endpoint);
 
         camelContext.activateEndpoints();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        client.stop();
+        camelContext.deactivateEndpoints();
+        super.tearDown();
     }
 
     protected abstract void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList);

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+/**
+ * A JMS {@link MessageListener} which can be used to delegate processing to a Camel endpoint.
+ *
+ * @version $Revision$
+ */
+public class EndpointMessageListener<E extends Exchange> implements MessageListener {
+    private static final transient Log log = LogFactory.getLog(EndpointMessageListener.class);
+    private Endpoint<E> endpoint;
+    private Processor<E> processor;
+    private JmsBinding binding;
+
+    public EndpointMessageListener(Endpoint<E> endpoint, Processor<E> processor) {
+        this.endpoint = endpoint;
+        this.processor = processor;
+    }
+
+    public void onMessage(Message message) {
+        if (log.isDebugEnabled()) {
+            log.debug(endpoint + " receiving JMS message: " + message);
+        }
+        JmsExchange exchange = createExchange(message);
+        processor.onExchange((E) exchange);
+    }
+
+    public JmsExchange createExchange(Message message) {
+        return new JmsExchange(endpoint.getContext(), getBinding(), message);
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public JmsBinding getBinding() {
+        if (binding == null) {
+            binding = new JmsBinding();
+        }
+        return binding;
+    }
+
+    /**
+     * Sets the binding used to convert from a Camel message to and from a JMS message
+     *
+     * @param binding the binding to use
+     */
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
+    }
+}

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Tue Mar 27 02:30:52 2007
@@ -111,17 +111,7 @@
         });
         */
 
-        AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
-        listenerContainer.setDestinationName(subject);
-        listenerContainer.setPubSubDomain(template.isPubSubDomain());
-        listenerContainer.setConnectionFactory(template.getConnectionFactory());
-
-        // TODO support optional parameters
-        // selector
-        // messageConverter
-        // durableSubscriberName 
-
-        return new JmsEndpoint(uri, getContext(), subject, template, listenerContainer);
+        return new JmsEndpoint(uri, getContext(), subject, template);
     }
 
     public JmsTemplate getTemplate() {
@@ -130,13 +120,6 @@
 
     public void setTemplate(JmsTemplate template) {
         this.template = template;
-    }
-
-    protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
-        // TODO use an enum to auto-switch container types?
-
-        //return new SimpleMessageListenerContainer();
-        return new DefaultMessageListenerContainer();
     }
 
     /**

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.Processor;
+import org.apache.camel.Consumer;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+/**
+ * A {@link Consumer} which uses Spring's {@link AbstractMessageListenerContainer} implementations to consume JMS messages
+ *
+ * @version $Revision$
+ */
+public class JmsConsumer extends DefaultConsumer<JmsExchange> {
+    private final AbstractMessageListenerContainer listenerContainer;
+
+    public JmsConsumer(JmsEndpoint endpoint, Processor<JmsExchange> processor, AbstractMessageListenerContainer listenerContainer) {
+        super(endpoint, processor);
+        this.listenerContainer = listenerContainer;
+
+        MessageListener messageListener = createMessageListener(endpoint, processor);
+        this.listenerContainer.setMessageListener(messageListener);
+    }
+
+    protected MessageListener createMessageListener(JmsEndpoint endpoint, Processor<JmsExchange> processor) {
+        EndpointMessageListener<JmsExchange> messageListener = new EndpointMessageListener<JmsExchange>(endpoint, processor);
+        messageListener.setBinding(endpoint.getBinding());
+        return messageListener;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        listenerContainer.afterPropertiesSet();
+        listenerContainer.initialize();
+        listenerContainer.start();
+    }
+
+
+    @Override
+    protected void doStop() throws Exception {
+        listenerContainer.stop();
+        listenerContainer.destroy();
+        super.doStop();
+    }
+
+}

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Tue Mar 27 02:30:52 2007
@@ -18,12 +18,18 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -33,45 +39,35 @@
 /**
  * @version $Revision:520964 $
  */
-public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
+public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
     private static final Log log = LogFactory.getLog(JmsEndpoint.class);
     private JmsBinding binding;
-    private JmsOperations template;
-    private AbstractMessageListenerContainer listenerContainer;
+    private JmsTemplate template;
     private String destination;
 
-    public JmsEndpoint(String endpointUri, CamelContext container, String destination, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
+    public JmsEndpoint(String endpointUri, CamelContext container, String destination, JmsTemplate template) {
         super(endpointUri, container);
         this.destination = destination;
         this.template = template;
-        this.listenerContainer = listenerContainer;
-        this.listenerContainer.setMessageListener(this);
     }
 
-    public void onMessage(Message message) {
-        if (log.isDebugEnabled()) {
-            log.debug(JmsEndpoint.this + " receiving JMS message: " + message);
-        }
-        JmsExchange exchange = createExchange(message);
-        getInboundProcessor().onExchange(exchange);
+    public Producer<JmsExchange> createProducer() throws Exception {
+        return startService(new JmsProducer(this, template));
     }
 
-    public void onExchange(Exchange exchange) {
-        // lets convert to the type of an exchange
-        JmsExchange jmsExchange = convertTo(JmsExchange.class, exchange);
-        onExchange(jmsExchange);
-    }
-
-    public void onExchange(final JmsExchange exchange) {
-        template.send(destination, new MessageCreator() {
-            public Message createMessage(Session session) throws JMSException {
-                Message message = getBinding().makeJmsMessage(exchange.getIn(), session);
-                if (log.isDebugEnabled()) {
-                    log.debug(JmsEndpoint.this + " sending JMS message: " + message);
-                }
-                return message;
-            }
-        });
+    public Consumer<JmsExchange> createConsumer(Processor<JmsExchange> processor) throws Exception {
+        AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
+        listenerContainer.setDestinationName(destination);
+        listenerContainer.setPubSubDomain(template.isPubSubDomain());
+        listenerContainer.setConnectionFactory(template.getConnectionFactory());
+
+        // TODO support optional parameters
+        // selector
+        // messageConverter
+        // durableSubscriberName
+
+
+        return startService(new JmsConsumer(this, processor, listenerContainer));
     }
 
     public JmsExchange createExchange() {
@@ -104,18 +100,18 @@
         return template;
     }
 
+    public String getDestination() {
+        return destination;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
-    protected void doActivate() throws Exception {
-        super.doActivate();
-        listenerContainer.afterPropertiesSet();
-        listenerContainer.initialize();
-        listenerContainer.start();
-    }
 
-    protected void doDeactivate() {
-        listenerContainer.stop();
-        listenerContainer.destroy();
-        super.doDeactivate();
+    protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
+        // TODO use an enum to auto-switch container types?
+
+        //return new SimpleMessageListenerContainer();
+        return new DefaultMessageListenerContainer();
     }
+
 }

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.JmsOperations;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.JMSException;
+
+/**
+ * @version $Revision$
+ */
+public class JmsProducer extends DefaultProducer<JmsExchange> {
+    private static final transient Log log = LogFactory.getLog(JmsProducer.class);
+
+    private final JmsEndpoint endpoint;
+    private final JmsOperations template;
+
+    public JmsProducer(JmsEndpoint endpoint, JmsOperations template) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.template = template;
+    }
+
+    public void onExchange(Exchange exchange) {
+        // lets convert to the type of an exchange
+        JmsExchange jmsExchange = endpoint.convertTo(JmsExchange.class, exchange);
+        onExchange(jmsExchange);
+    }
+
+    public void onExchange(final JmsExchange exchange) {
+        template.send(endpoint.getDestination(), new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                Message message = endpoint.getBinding().makeJmsMessage(exchange.getIn(), session);
+                if (log.isDebugEnabled()) {
+                    log.debug(endpoint + " sending JMS message: " + message);
+                }
+                return message;
+            }
+        });
+    }
+}

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,8 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ProducerCache;
 import org.apache.camel.builder.RouteBuilder;
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -44,6 +46,7 @@
     protected CamelContext container = new DefaultCamelContext();
     protected CountDownLatch latch = new CountDownLatch(1);
     protected Endpoint<JmsExchange> endpoint;
+    protected ProducerCache<JmsExchange> client = new ProducerCache<JmsExchange>();
 
     public void testJmsRouteWithTextMessage() throws Exception {
         String expectedBody = "Hello there!";
@@ -62,13 +65,15 @@
         assertEquals("body", expectedBody, body);
     }
 
-    protected void sendExchange(Object expectedBody) {
-        // now lets fire in a message
-        JmsExchange exchange = endpoint.createExchange();
-        JmsMessage in = exchange.getIn();
-        in.setBody(expectedBody);
-        in.setHeader("cheese", 123);
-        endpoint.onExchange(exchange);
+    protected void sendExchange(final Object expectedBody) {
+        client.send(endpoint, new Processor<JmsExchange>() {
+            public void onExchange(JmsExchange exchange) {
+                // now lets fire in a message
+                JmsMessage in = exchange.getIn();
+                in.setBody(expectedBody);
+                in.setHeader("cheese", 123);
+            }
+        });
     }
 
     protected Object assertReceivedValidExchange(Class type) throws Exception {
@@ -96,7 +101,7 @@
         container.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
 
         // lets add some routes
-        container.setRoutes(new RouteBuilder() {
+        container.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("jms:activemq:test.a").to("jms:activemq:test.b");
                 from("jms:activemq:test.b").process(new Processor<JmsExchange>() {
@@ -116,6 +121,7 @@
 
     @Override
     protected void tearDown() throws Exception {
+        client.stop();
         container.deactivateEndpoints();
     }
 }

Added: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (added)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+import java.net.SocketAddress;
+
+/**
+ * A @{link Consumer} for MINA
+ *
+ * @version $Revision$
+ */
+public class MinaConsumer extends DefaultConsumer<MinaExchange> {
+    private static final transient Log log = LogFactory.getLog(MinaConsumer.class);
+
+    private final MinaEndpoint endpoint;
+    private final SocketAddress address;
+    private final IoAcceptor acceptor;
+
+    public MinaConsumer(final MinaEndpoint endpoint, Processor<MinaExchange> processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+         address = endpoint.getAddress();
+         acceptor = endpoint.getAcceptor();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Binding to server address: " + address + " using acceptor: " + acceptor);
+        }
+
+        IoHandler handler = new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object object) throws Exception {
+                getProcessor().onExchange(endpoint.createExchange(session, object));
+            }
+        };
+
+        acceptor.bind(address, handler);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        acceptor.unbind(address);
+        super.doStop();
+    }
+}

Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Tue Mar 27 02:30:52 2007
@@ -18,20 +18,18 @@
 package org.apache.camel.component.mina;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.transport.vmpipe.VmPipeConnector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.net.SocketAddress;
-import java.io.IOException;
 
 /**
  * @version $Revision$
@@ -39,9 +37,6 @@
 public class MinaEndpoint extends DefaultEndpoint<MinaExchange> {
     private static final transient Log log = LogFactory.getLog(MinaEndpoint.class);
 
-    private IoSession session;
-    private IoHandler serverHandler;
-    private IoHandler clientHandler;
     private final IoAcceptor acceptor;
     private final SocketAddress address;
     private final IoConnector connector;
@@ -54,12 +49,12 @@
         this.connector = connector;
     }
 
-    public void onExchange(MinaExchange exchange) {
-        Object body = exchange.getIn().getBody();
-        if (body == null) {
-            System.out.println("#### No payload for exchange: " + exchange);
-        }
-        getSession().write(body);
+    public Producer<MinaExchange> createProducer() throws Exception {
+        return startService(new MinaProducer(this));
+    }
+
+    public Consumer<MinaExchange> createConsumer(Processor<MinaExchange> processor) throws Exception {
+        return startService(new MinaConsumer(this, processor));
     }
 
     public MinaExchange createExchange() {
@@ -73,27 +68,18 @@
         return exchange;
     }
 
-    public IoHandler getServerHandler() {
-        if (serverHandler == null) {
-            serverHandler = createServerHandler();
-        }
-        return serverHandler;
-    }
-
-    public IoHandler getClientHandler() {
-        if (clientHandler == null) {
-            clientHandler = createClientHandler();
-        }
-        return clientHandler;
+    // Properties
+    //-------------------------------------------------------------------------
+    public IoAcceptor getAcceptor() {
+        return acceptor;
     }
 
-    public IoSession getSession() {
-        // TODO lazy create if no inbound processor attached?
-        return session;
+    public SocketAddress getAddress() {
+        return address;
     }
 
-    public void setSession(IoSession session) {
-        this.session = session;
+    public IoConnector getConnector() {
+        return connector;
     }
 
     // Implementation methods
@@ -102,56 +88,10 @@
     @Override
     protected void doActivate() throws Exception {
         super.doActivate();
-
-        if (getInboundProcessor() != null) {
-            // lets initiate the server
-
-            if (log.isDebugEnabled()) {
-                log.debug("Binding to server address: " + address + " using acceptor: " + acceptor);            
-            }
-
-            acceptor.bind(address, getServerHandler());
-        }
-        setSession(createSession());
-    }
-
-    /**
-     * Initiates the client connection for outbound communication
-     */
-    protected IoSession createSession() {
-        if (log.isDebugEnabled()) {
-            log.debug("Creating connector to address: " + address + " using connector: " + connector);
-        }
-        ConnectFuture future = connector.connect(address, getClientHandler());
-        future.join();
-        return future.getSession();
     }
 
-
     @Override
     protected void doDeactivate() {
         acceptor.unbindAll();
-    }
-
-    protected IoHandler createClientHandler() {
-        return new IoHandlerAdapter() {
-            @Override
-            public void messageReceived(IoSession ioSession, Object object) throws Exception {
-                super.messageReceived(ioSession, object);    /** TODO */
-            }
-        };
-    }
-
-    protected IoHandler createServerHandler() {
-        return new IoHandlerAdapter() {
-            @Override
-            public void messageReceived(IoSession session, Object object) throws Exception {
-                processInboundMessage(session, object);
-            }
-        };
-    }
-
-    private void processInboundMessage(IoSession session, Object object) {
-        getInboundProcessor().onExchange(createExchange(session, object));
     }
 }

Added: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (added)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+import java.net.SocketAddress;
+
+/**
+ * A {@link Producer} implementation for MINA
+ *
+ * @version $Revision$
+ */
+public class MinaProducer extends DefaultProducer<MinaExchange> {
+    private static final transient Log log = LogFactory.getLog(MinaProducer.class);
+    private IoSession session;
+    private MinaEndpoint endpoint;
+
+    public MinaProducer(MinaEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void onExchange(MinaExchange exchange) {
+        if (session == null) {
+            throw new IllegalStateException("Not started yet!");
+        }
+        Object body = exchange.getIn().getBody();
+        if (body == null) {
+            log.warn("No payload for exchange: " + exchange);
+        }
+        else {
+            session.write(body);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        SocketAddress address = endpoint.getAddress();
+        IoConnector connector = endpoint.getConnector();
+        if (log.isDebugEnabled()) {
+            log.debug("Creating connector to address: " + address + " using connector: " + connector);
+        }
+        IoHandler ioHandler = new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession ioSession, Object object) throws Exception {
+                super.messageReceived(ioSession, object);    /** TODO */
+            }
+        };
+        ConnectFuture future = connector.connect(address, ioHandler);
+        future.join();
+        session = future.getSession();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (session != null) {
+            session.close().join(2000);
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java (original)
+++ activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 
@@ -36,6 +37,7 @@
     protected CountDownLatch latch = new CountDownLatch(1);
     protected MinaExchange receivedExchange;
     protected String uri = "mina:vm://localhost:8080";
+    protected Producer<MinaExchange> producer;
 
     public void testMinaRoute() throws Exception {
 
@@ -46,7 +48,8 @@
         message.setBody("Hello there!");
         message.setHeader("cheese", 123);
 
-        endpoint.onExchange(exchange);
+        producer = endpoint.createProducer();
+        producer.onExchange(exchange);
 
         // now lets sleep for a while
         boolean received = latch.await(5, TimeUnit.SECONDS);
@@ -55,13 +58,16 @@
 
     @Override
     protected void setUp() throws Exception {
-        container.setRoutes(createRouteBuilder());
+        container.addRoutes(createRouteBuilder());
         container.activateEndpoints();
     }
 
 
     @Override
     protected void tearDown() throws Exception {
+        if (producer != null) {
+            producer.stop();
+        }
         container.deactivateEndpoints();
     }
 

Modified: activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java (original)
+++ activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java Tue Mar 27 02:30:52 2007
@@ -115,7 +115,7 @@
     /**
      * Strategy to install all available routes into the context
      */
-    protected void installRoutes() {
+    protected void installRoutes() throws Exception {
         for (RouteBuilder routeBuilder : additionalBuilders) {
             getContext().addRoutes(routeBuilder);
         }



Mime
View raw message