camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-8491: Camel POJO producer/consumer should defer starting until CamelContext is starting
Date Mon, 16 Mar 2015 11:58:31 GMT
Repository: camel
Updated Branches:
  refs/heads/master 9f893d83e -> 9600bc4fe


CAMEL-8491: Camel POJO producer/consumer should defer starting until CamelContext is starting


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9600bc4f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9600bc4f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9600bc4f

Branch: refs/heads/master
Commit: 9600bc4fec55a4ed02f2404dfd33b3584491ec80
Parents: 9f893d8
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Mar 16 09:10:53 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Mar 16 13:00:08 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/CamelContext.java     |  16 ++-
 .../org/apache/camel/DeferStartService.java     |  28 ++++
 .../camel/component/bean/ProxyHelper.java       |   7 +-
 .../camel/impl/CamelPostProcessorHelper.java    |  35 +++--
 .../apache/camel/impl/DefaultCamelContext.java  |  57 ++++++--
 .../camel/impl/DefaultProducerTemplate.java     |   6 +
 .../org/apache/camel/impl/DeferProducer.java    | 140 +++++++++++++++++++
 .../camel/impl/DeferServiceStartupListener.java |  45 ++++++
 .../camel/processor/DeferServiceFactory.java    |  46 ++++++
 .../impl/PojoProduceInterceptEndpointTest.java  | 107 ++++++++++++++
 .../PojoProduceProxyInterceptEndpointTest.java  | 106 ++++++++++++++
 .../handler/CamelNamespaceHandler.java          |  12 +-
 12 files changed, 566 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index bfe116c..84505bc 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -219,10 +219,10 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration
{
      * If the option <tt>closeOnShutdown</tt> is <tt>false</tt> then
this context will not stop the service when the context stops.
      *
      * @param object the service
-     * @param closeOnShutdown whether to close the service when this CamelContext shutdown.
+     * @param stopOnShutdown whether to stop the service when this CamelContext shutdown.
      * @throws Exception can be thrown when starting the service
      */
-    void addService(Object object, boolean closeOnShutdown) throws Exception;
+    void addService(Object object, boolean stopOnShutdown) throws Exception;
 
     /**
      * Removes a service from this context.
@@ -253,6 +253,18 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration
{
     <T> T hasService(Class<T> type);
 
     /**
+     * Defers starting the service until {@link CamelContext} is started and has initialized
all its prior services and routes.
+     * <p/>
+     * If {@link CamelContext} is already started then the service is started immediately.
+     *
+     * @param object the service
+     * @param stopOnShutdown whether to stop the service when this CamelContext shutdown.
Setting this to <tt>true</tt> will keep a reference to the service in
+     *                       this {@link CamelContext} until the context is stopped. So do
not use it for short lived services.
+     * @throws Exception can be thrown when starting the service, which is only attempted
if {@link CamelContext} has already been started when calling this method.
+     */
+    void deferStartService(Object object, boolean stopOnShutdown) throws Exception;
+
+    /**
      * Adds the given listener to be invoked when {@link CamelContext} have just been started.
      * <p/>
      * This allows listeners to do any custom work after the routes and other services have
been started and are running.

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/DeferStartService.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/DeferStartService.java b/camel-core/src/main/java/org/apache/camel/DeferStartService.java
new file mode 100644
index 0000000..90e45b3
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/DeferStartService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+public interface DeferStartService<T, F> {
+
+    /**
+     * Creates the service by defers starting the service.
+     *
+     * @param factory the factory which creates the service
+     * @return the service
+     */
+    T create(F factory);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java b/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
index d287caf..d51cf18 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
@@ -20,7 +20,7 @@ import java.lang.reflect.Proxy;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Producer;
-import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.processor.DeferServiceFactory;
 
 /**
  * A helper class for creating proxies which delegate to Camel
@@ -54,9 +54,8 @@ public final class ProxyHelper {
      * Creates a Proxy which sends the exchange to the endpoint.
      */
     public static <T> T createProxy(Endpoint endpoint, ClassLoader cl, Class<T>[]
interfaceClasses, MethodInfoCache methodCache) throws Exception {
-        Producer producer = endpoint.createProducer();
-        // ensure the producer is started
-        ServiceHelper.startService(producer);
+        Producer producer = DeferServiceFactory.createProducer(endpoint);
+        endpoint.getCamelContext().deferStartService(producer, true);
         return createProxyObject(endpoint, producer, cl, interfaceClasses, methodCache);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
index 817a2f9..af91ae7 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
@@ -22,7 +22,6 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.Component;
 import org.apache.camel.Consume;
 import org.apache.camel.Consumer;
 import org.apache.camel.ConsumerTemplate;
@@ -39,6 +38,7 @@ import org.apache.camel.component.bean.BeanInfo;
 import org.apache.camel.component.bean.BeanProcessor;
 import org.apache.camel.component.bean.ProxyHelper;
 import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.processor.DeferServiceFactory;
 import org.apache.camel.processor.UnitOfWorkProducer;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.IntrospectionSupport;
@@ -105,7 +105,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
                 Processor processor = createConsumerProcessor(bean, method, endpoint);
                 Consumer consumer = endpoint.createConsumer(processor);
                 LOG.debug("Created processor: {} for consumer: {}", processor, consumer);
-                startService(consumer, bean, beanName);
+                startService(consumer, endpoint.getCamelContext(), bean, beanName);
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             }
@@ -115,13 +115,19 @@ public class CamelPostProcessorHelper implements CamelContextAware {
     /**
      * Stats the given service
      */
-    protected void startService(Service service, Object bean, String beanName) throws Exception
{
-        if (isSingleton(bean, beanName)) {
-            getCamelContext().addService(service);
+    protected void startService(Service service, CamelContext camelContext, Object bean,
String beanName) throws Exception {
+        // defer starting the service until CamelContext has started all its initial services
+        if (camelContext != null) {
+            camelContext.deferStartService(service, true);
         } else {
-            LOG.debug("Service is not singleton so you must remember to stop it manually
{}", service);
+            // mo CamelContext then start service manually
             ServiceHelper.startService(service);
         }
+
+        boolean singleton = isSingleton(bean, beanName);
+        if (!singleton) {
+            LOG.debug("Service is not singleton so you must remember to stop it manually
{}", service);
+        }
     }
 
     /**
@@ -281,10 +287,12 @@ public class CamelPostProcessorHelper implements CamelContextAware {
                                                                String injectionPointName,
Object bean) {
         // endpoint is optional for this injection point
         Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty,
injectionPointName, false);
-        ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint);
+        CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext();
+        ProducerTemplate answer = new DefaultProducerTemplate(context, endpoint);
         // start the template so its ready to use
         try {
-            answer.start();
+            // no need to defer the template as it can adjust to the endpoint at runtime
+            startService(answer, context, bean, null);
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
         }
@@ -299,7 +307,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
         ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
         // start the template so its ready to use
         try {
-            answer.start();
+            startService(answer, null, null, null);
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
         }
@@ -311,9 +319,9 @@ public class CamelPostProcessorHelper implements CamelContextAware {
      */
     protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean,
String beanName) {
         try {
-            PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
-            startService(pollingConsumer, bean, beanName);
-            return pollingConsumer;
+            PollingConsumer consumer = endpoint.createPollingConsumer();
+            startService(consumer, endpoint.getCamelContext(), bean, beanName);
+            return consumer;
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
         }
@@ -324,8 +332,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
      */
     protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName)
{
         try {
-            Producer producer = endpoint.createProducer();
-            startService(producer, bean, beanName);
+            Producer producer = DeferServiceFactory.createProducer(endpoint);
             return new UnitOfWorkProducer(producer);
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 9ac71b6..f9cfceb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -38,7 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.naming.Context;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
@@ -176,8 +175,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
     private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>();
     private final Map<String, Component> components = new HashMap<String, Component>();
     private final Set<Route> routes = new LinkedHashSet<Route>();
-    private final List<Service> servicesToClose = new CopyOnWriteArrayList<Service>();
+    private final List<Service> servicesToStop = new CopyOnWriteArrayList<Service>();
     private final Set<StartupListener> startupListeners = new LinkedHashSet<StartupListener>();
+    private final DeferServiceStartupListener deferStartupListener = new DeferServiceStartupListener();
     private TypeConverter typeConverter;
     private TypeConverterRegistry typeConverterRegistry;
     private Injector injector;
@@ -266,6 +266,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         // create endpoint registry at first since end users may access endpoints before
CamelContext is started
         this.endpoints = new DefaultEndpointRegistry(this);
 
+        // add the derfer service startup listener
+        this.startupListeners.add(deferStartupListener);
+
         // use WebSphere specific resolver if running on WebSphere
         if (WebSpherePackageScanClassResolver.isWebSphereClassLoader(this.getClass().getClassLoader()))
{
             log.info("Using WebSphere specific PackageScanClassResolver");
@@ -1054,11 +1057,11 @@ public class DefaultCamelContext extends ServiceSupport implements
ModelCamelCon
         addService(object, true);
     }
 
-    public void addService(Object object, boolean closeOnShutdown) throws Exception {
-        doAddService(object, closeOnShutdown);
+    public void addService(Object object, boolean stopOnShutdown) throws Exception {
+        doAddService(object, stopOnShutdown);
     }
 
-    private void doAddService(Object object, boolean closeOnShutdown) throws Exception {
+    private void doAddService(Object object, boolean stopOnShutdown) throws Exception {
         // inject CamelContext
         if (object instanceof CamelContextAware) {
             CamelContextAware aware = (CamelContextAware) object;
@@ -1085,9 +1088,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
             }
             // do not add endpoints as they have their own list
             if (singleton && !(service instanceof Endpoint)) {
-                // only add to list of services to close if its not already there
-                if (closeOnShutdown && !hasService(service)) {
-                    servicesToClose.add(service);
+                // only add to list of services to stop if its not already there
+                if (stopOnShutdown && !hasService(service)) {
+                    servicesToStop.add(service);
                 }
             }
         }
@@ -1110,7 +1113,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
             for (LifecycleStrategy strategy : lifecycleStrategies) {
                 strategy.onServiceRemove(this, service, null);
             }
-            return servicesToClose.remove(service);
+            return servicesToStop.remove(service);
         }
         return false;
     }
@@ -1118,14 +1121,14 @@ public class DefaultCamelContext extends ServiceSupport implements
ModelCamelCon
     public boolean hasService(Object object) {
         if (object instanceof Service) {
             Service service = (Service) object;
-            return servicesToClose.contains(service);
+            return servicesToStop.contains(service);
         }
         return false;
     }
 
     @Override
     public <T> T hasService(Class<T> type) {
-        for (Service service : servicesToClose) {
+        for (Service service : servicesToStop) {
             if (type.isInstance(service)) {
                 return type.cast(service);
             }
@@ -1133,6 +1136,32 @@ public class DefaultCamelContext extends ServiceSupport implements
ModelCamelCon
         return null;
     }
 
+    public void deferStartService(Object object, boolean stopOnShutdown) throws Exception
{
+        if (object instanceof Service) {
+            Service service = (Service) object;
+
+            // only add to services to close if its a singleton
+            // otherwise we could for example end up with a lot of prototype scope endpoints
+            boolean singleton = true; // assume singleton by default
+            if (object instanceof IsSingleton) {
+                singleton = ((IsSingleton) service).isSingleton();
+            }
+            // do not add endpoints as they have their own list
+            if (singleton && !(service instanceof Endpoint)) {
+                // only add to list of services to stop if its not already there
+                if (stopOnShutdown && !hasService(service)) {
+                    servicesToStop.add(service);
+                }
+            }
+            // are we already started?
+            if (isStarted()) {
+                ServiceHelper.startService(service);
+            } else {
+                deferStartupListener.addService(service);
+            }
+        }
+    }
+
     public void addStartupListener(StartupListener listener) throws Exception {
         // either add to listener so we can invoke then later when CamelContext has been
started
         // or invoke the callback right now
@@ -2680,7 +2709,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
 
         // stop consumers from the services to close first, such as POJO consumer (eg @Consumer)
         // which we need to stop after the routes, as a POJO consumer is essentially a route
also
-        for (Service service : servicesToClose) {
+        for (Service service : servicesToStop) {
             if (service instanceof Consumer) {
                 shutdownServices(service);
             }
@@ -2716,8 +2745,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         }
 
         // shutdown services as late as possible
-        shutdownServices(servicesToClose);
-        servicesToClose.clear();
+        shutdownServices(servicesToStop);
+        servicesToStop.clear();
 
         // must notify that we are stopped before stopping the management strategy
         EventHelper.notifyCamelContextStopped(this);

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index 6a7cfd4..aa3fa10 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -736,6 +736,12 @@ public class DefaultProducerTemplate extends ServiceSupport implements
ProducerT
             }
             producerCache.setEventNotifierEnabled(isEventNotifierEnabled());
         }
+
+        // need to lookup default endpoint as it may have been intercepted
+        if (defaultEndpoint != null) {
+            defaultEndpoint = camelContext.getEndpoint(defaultEndpoint.getEndpointUri());
+        }
+
         ServiceHelper.startService(producerCache);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
new file mode 100644
index 0000000..0ddf1bf
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link Producer} that defers being started, until {@link org.apache.camel.CamelContext}
has been started, this
+ * ensures that the producer is able to adapt to changes that may otherwise occur during
starting
+ * CamelContext. If we do not defer starting the producer it may not adapt to those changes,
and
+ * send messages to wrong endpoints.
+ */
+public class DeferProducer extends org.apache.camel.support.ServiceSupport implements Producer,
AsyncProcessor {
+
+    private Producer delegate;
+    private final Endpoint endpoint;
+
+    public DeferProducer(Endpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public Exchange createExchange() {
+        if (delegate == null) {
+            throw new IllegalStateException("Not started");
+        }
+        return delegate.createExchange();
+    }
+
+    @Override
+    public Exchange createExchange(ExchangePattern pattern) {
+        if (delegate == null) {
+            throw new IllegalStateException("Not started");
+        }
+        return delegate.createExchange(pattern);
+    }
+
+    @Override
+    @Deprecated
+    public Exchange createExchange(Exchange exchange) {
+        if (delegate == null) {
+            throw new IllegalStateException("Not started");
+        }
+        return delegate.createExchange(exchange);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (delegate == null) {
+            throw new IllegalStateException("Not started");
+        }
+        delegate.process(exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (delegate == null) {
+            exchange.setException(new IllegalStateException("Not started"));
+            callback.done(true);
+            return true;
+        }
+
+        if (delegate instanceof AsyncProcessor) {
+            return ((AsyncProcessor) delegate).process(exchange, callback);
+        }
+
+        // fallback to sync mode
+        try {
+            process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // need to lookup endpoint again as it may be intercepted
+        Endpoint lookup = endpoint.getCamelContext().getEndpoint(endpoint.getEndpointUri());
+
+        delegate = lookup.createProducer();
+        ServiceHelper.startService(delegate);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(delegate);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        if (delegate != null) {
+            return delegate.isSingleton();
+        } else {
+            // assume singleton by default
+            return true;
+        }
+    }
+
+    @Override
+    public Endpoint getEndpoint() {
+        if (delegate != null) {
+            return delegate.getEndpoint();
+        } else {
+            return endpoint;
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (delegate != null) {
+            return delegate.toString();
+        } else {
+            return "DelegateProducer[" + endpoint + "]";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java
b/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java
new file mode 100644
index 0000000..a78bdd8
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Service;
+import org.apache.camel.StartupListener;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.StartupListener} that defers starting {@link Service}s.
+ */
+public class DeferServiceStartupListener implements StartupListener {
+
+    private final Set<Service> services = new CopyOnWriteArraySet<Service>();
+
+    public void addService(Service service) {
+        services.add(service);
+    }
+
+    @Override
+    public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws
Exception {
+        for (Service service : services) {
+            ServiceHelper.startService(service);
+        }
+        services.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
b/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
new file mode 100644
index 0000000..9dac0dd
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DeferProducer;
+
+/**
+ * Factory to create {@link org.apache.camel.DeferStartService} services such as {@link Producer}s
+ * and {@link org.apache.camel.PollingConsumer}s
+ */
+public class DeferServiceFactory {
+
+    /**
+     * Creates the {@link Producer} which is deferred started until {@link org.apache.camel.CamelContext}
is being started.
+     * <p/>
+     * When the producer is started, it re-lookup the endpoint to capture any changes such
as the endpoint has been intercepted.
+     * This allows the producer to react and send messages to the updated endpoint.
+     *
+     * @param endpoint  the endpoint
+     * @return the producer which will be deferred started until {@link org.apache.camel.CamelContext}
has been started
+     * @throws Exception can be thrown if there is an error starting the producer
+     * @see org.apache.camel.impl.DeferProducer
+     */
+    public static Producer createProducer(Endpoint endpoint) throws Exception {
+        Producer producer = new DeferProducer(endpoint);
+        endpoint.getCamelContext().deferStartService(producer, true);
+        return producer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java
b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java
new file mode 100644
index 0000000..553d28e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class PojoProduceInterceptEndpointTest extends TestCase {
+
+    public void testPojoProduceInterceptAlreadyStarted() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("direct:start")
+                        .to("language:simple:${body}${body}");
+
+                from("direct:start")
+                    .to("mock:result");
+            }
+        });
+
+        // start Camel before POJO being injected
+        context.start();
+
+        // use the injector (will use the default)
+        // which should post process the bean to inject the @Produce
+        MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+        MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+        mock.expectedBodiesReceived("WorldWorld");
+
+        Object reply = bean.doSomething("World");
+        assertEquals("WorldWorld", reply);
+
+        mock.assertIsSatisfied();
+
+        context.stop();
+    }
+
+    public void testPojoProduceInterceptNotStarted() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("direct:start")
+                        .to("language:simple:${body}${body}");
+
+                from("direct:start")
+                    .to("mock:result");
+            }
+        });
+
+        // use the injector (will use the default)
+        // which should post process the bean to inject the @Produce
+        MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+        // do NOT start Camel before POJO being injected
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+        mock.expectedBodiesReceived("WorldWorld");
+
+        Object reply = bean.doSomething("World");
+        assertEquals("WorldWorld", reply);
+
+        mock.assertIsSatisfied();
+
+        context.stop();
+    }
+
+    public static class MyBean {
+
+        @Produce(uri = "direct:start")
+        Producer producer;
+
+        public Object doSomething(String body) throws Exception {
+            Exchange exchange = producer.createExchange();
+            exchange.getIn().setBody(body);
+            producer.process(exchange);
+            return exchange.hasOut() ? exchange.getOut().getBody() : exchange.getIn().getBody();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java
b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java
new file mode 100644
index 0000000..f03ec9d
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class PojoProduceProxyInterceptEndpointTest extends TestCase {
+
+    public void testPojoProduceInterceptAlreadyStarted() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("direct:start")
+                        .to("language:simple:${body}${body}");
+
+                from("direct:start")
+                    .to("mock:result");
+            }
+        });
+
+        // start Camel before POJO being injected
+        context.start();
+
+        // use the injector (will use the default)
+        // which should post process the bean to inject the @Produce
+        MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+        MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+        mock.expectedBodiesReceived("WorldWorld");
+
+        Object reply = bean.doSomething("World");
+        assertEquals("WorldWorld", reply);
+
+        mock.assertIsSatisfied();
+
+        context.stop();
+    }
+
+    public void testPojoProduceInterceptNotStarted() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("direct:start")
+                        .to("language:simple:${body}${body}");
+
+                from("direct:start")
+                    .to("mock:result");
+            }
+        });
+
+        // use the injector (will use the default)
+        // which should post process the bean to inject the @Produce
+        MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+        // do NOT start Camel before POJO being injected
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+        mock.expectedBodiesReceived("WorldWorld");
+
+        Object reply = bean.doSomething("World");
+        assertEquals("WorldWorld", reply);
+
+        mock.assertIsSatisfied();
+
+        context.stop();
+    }
+
+    public static interface EchoService {
+        public String echo(String word);
+    }
+
+    public static class MyBean {
+
+        @Produce(uri = "direct:start")
+        private EchoService echo;
+
+        public Object doSomething(String body) throws Exception {
+            return echo.echo(body);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
index cd44d85..475c057 100644
--- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
+++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
@@ -928,11 +928,13 @@ public class CamelNamespaceHandler implements NamespaceHandler {
 
         @Override
         protected boolean isSingleton(Object bean, String beanName) {
-            ComponentMetadata meta = blueprintContainer.getComponentMetadata(beanName);
-            if (meta != null && meta instanceof BeanMetadata) {
-                String scope = ((BeanMetadata) meta).getScope();
-                if (scope != null) {
-                    return BeanMetadata.SCOPE_SINGLETON.equals(scope);
+            if (beanName != null) {
+                ComponentMetadata meta = blueprintContainer.getComponentMetadata(beanName);
+                if (meta != null && meta instanceof BeanMetadata) {
+                    String scope = ((BeanMetadata) meta).getScope();
+                    if (scope != null) {
+                        return BeanMetadata.SCOPE_SINGLETON.equals(scope);
+                    }
                 }
             }
             // fallback to super, which will assume singleton


Mime
View raw message